ini Beanstalkd队列工作者

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ini Beanstalkd队列工作者相关的知识,希望对你有一定的参考价值。

<?php

declare(strict_types=1);

namespace My\Queue;

use Pheanstalk\Pheanstalk;
use Exception;

class BeanstalkdWorker 
{
    protected $queueClient;

    protected $queueNames;

    protected $handler;

    protected $options = [];

    private $startTime;

    private function __construct(
        Pheanstalk $queueClient,
        array $queueNames,
        callable $handler,
        array $options
    ) {
        $this->queueClient = $queueClient;
        $this->queueNames = $queueNames;
        $this->handler = $handler;
        $this->options = array_merge($this->getDefaultOptions(), $options);
    }

    public static function create(
        Pheanstalk $queueClient,
        $queueNames,
        callable $handler,
        array $options
    ) : self {
        return new self(
            $queueClient,
            $queueNames,
            $handler,
            $logger,
            $options
        );
    }

    private function getDefaultOptions() : array
    {
        return [
            'sleep' => 5,
            'timeout' => 60,
            'memory' => 128,
            'on_error' => function ($job, Exception $error) {
                $this->queueClient->bury($error->getJob());
            }
        ];
    }

    public function run()
    {
        $this->init();

        while (true) {
            $this->runNextJob();

            if ($this->shouldRestart()) {
                $this->stop();
            }
        }
    }

    final protected function init()
    {
        $this->startTime = time();
        
        $this->watchQueues();
    }
  
    protected function watchQueues()
    {
        foreach ($this->queueNames as $queueName) {
            $this->queueClient->watch($queueName);
        }
    }
  
    protected function runNextJob()
    {
        $job = $this->queueClient->reserve(0);

        if (!$job) {
            $this->sleep();
            
            return;
        }

        try {
            $this->processJob($job);
        } catch (Exception $ex) {
            $this->handleJobException($job, $ex);
        }
    }

    protected function processJob($job)
    {
        $handler = $this->handler;
        $handler($job);

        $this->queueClient->delete($job);
    }

    protected function handleJobException($job, $exception)
    {
        $errorHandler = $this->options['on_error'];
        $errorHandler($job, $exception);
    }

    protected function shouldRestart() : bool
    {
        if (
            $this->timeoutReached($this->options['timeout'])
            || $this->memoryExceeded($this->options['memory'])
        ) {
            return true;
        }

        return false;
    }

    protected function timeoutReached(int $timeout) : bool
    {
        return (time() - $this->startTime >= $timeout);
    }
    
    protected function memoryExceeded($memoryLimit) : bool
    {
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
    }

    protected function stop()
    {
        die;
    }

    protected function sleep()
    {
        sleep($this->options['sleep']);
    }
}
<?php

declare(strict_types=1);

use My\BeanstalkdWorker;
use Pheanstalk\Pheanstalk;
use Pheanstalk\Job;

$worker = BeanstalkdWorker::create(
    new Pheanstalk('127.0.0.1'),
    [
        'notifications',
    ],
    function (Job $job) {
        $notification = json_decode($job->getData(), true);
        
        mail($notification['to'], $notification['subject'], $notification['message']);
    },
    [
        'timeout' => 120,
        'memory' => 256,
    ]
);
$worker->run();
[program:notificationsWorker]
command=/usr/bin/php /home/ubuntu/app/bin/notifications_worker.php
process_name=%(program_name)s.%(process_num)s
numprocs=5
directory=/tmp
stdout_logfile=/var/log/supervisor/%(program_name)s.%(process_num)s.stdout.log
autostart=true
autorestart=true
user=ubuntu
exitcodes=0
stopsignal=KILL

以上是关于ini Beanstalkd队列工作者的主要内容,如果未能解决你的问题,请参考以下文章

Beanstalkd 在伪造队列工作人员上同时运行队列中的所有作业

如何安装和使用Beanstalkd工作队列

一种消息和任务队列——beanstalkd

Laravel 队列与 beanstalkd 和 redis 重复

使用Beanstalkd实现队列

消息队列_Beanstalkd-0001.Beanstalkd之轻量级分布式内存队列部署?