PHP-RESQUE重试机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PHP-RESQUE重试机制相关的知识,希望对你有一定的参考价值。

因为php-Resque 的重试需要自己写,网上又没啥轮子,而且resque也很久不更新了,所以自己研究下resque的源码,然后也借鉴了Laravel的队列重试机制,实现了PHP-Resque的重试机制。

设计思路

阅读resque源码,我们知道,resque把失败的队列的数据都放在了resque:failed列表,数据如下。

技术分享图片

我的项目是在yii2下统一处理resque的,
我直接resque源码在新增了retry方法,在每一个failJob的payload增加了attempts尝试次数,
一开始创建的时候attempts为0,重试之后attempts每次加1然后进行retry,直到到达尝试数次才停止重试。

核心代码

1.调用重试的代码

    /**
     * Retry Job
     */
    public function actionRetry()
    {
        $redis = new Redis(‘redis‘);
        while (true) {
            $json = $redis->rPop(‘resque:failed‘);
            $array = json_decode($json, true);
            if ($array) {
                $jobArray = $array[‘payload‘];
                if ($jobArray[‘attempts‘] < $this->retryTimes) {
                    //retry
                    Resque::retry($jobArray, $array[‘queue‘]);
                    echo "Queued job " . $jobArray[‘id‘] . ‘ has retry!‘ . "
";
                } else {
                    //stop retry
                    $redis->lPush(‘resque:failed‘, [$json]);
                }
            }
            //take a sleep
            echo "*** Sleeping for ".$this->sleep. "
";
            sleep($this->sleep);
        }

        return true;
    }
这里可以弄成守护进程一直处理失败的队列任务。

2./php-resque/lib/Resque.php
/**
     * retry job and save it to the specified queue.
     *
     * @param array $jobArray The attempts of the the job .
     * array(
     *  ‘id‘=> The id of the job
     *  ‘class‘ => The name of the class that contains the code to execute the job.
     *  ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
     *  ‘attempts‘=> The retry attempts of the job
     * )
     * @param string $queue The name of the queue to place the job in.
     *
     * @return boolean
     */
    public static function retry($jobArray,$queue)
    {
        require_once dirname(__FILE__) . ‘/Resque/Job.php‘;
        $result = Resque_Job::retry($jobArray,$queue);
        if ($result) {
            Resque_Event::trigger(‘afterEnqueue‘, array(
                ‘class‘ => $jobArray[‘class‘],
                ‘args‘  => $jobArray[‘args‘],
                ‘queue‘ => $queue,
            ));
        }

        return true;
    }
3./php-resque/lib/Resque/Job.php
/**
     * retry  job and save it to the specified queue.
     * *
     * @param array $jobArray The data of the job.
     * array(
     *  ‘id‘=> The id of the job
     *  ‘class‘ => The name of the class that contains the code to execute the job.
     *  ‘args‘ => Any optional arguments that should be passed when the job is executed.‘‘
     *  ‘attempts‘=> The retry attempts of the job
     * )
     * @param string $queue The name of the queue to place the job in.
     *
     * @return string
     */
    public static function retry($jobArray,$queue)
    {
        $args = $jobArray[‘args‘];
        if($args !== null && !is_array($args)) {
            throw new InvalidArgumentException(
                ‘Supplied $args must be an array.‘
            );
        }
        $jobArray[‘attempts‘]++;
        Resque::push($queue, array(
            ‘class‘ => $jobArray[‘class‘],
            ‘args‘  => array($args),
            ‘id‘    => $jobArray[‘id‘],
            ‘attempts‘=>$jobArray[‘attempts‘]
        ));
        return true;
    }

结果:

技术分享图片

我这里我设置了重试次数为3次,每隔5秒处理一个队列。

以上是关于PHP-RESQUE重试机制的主要内容,如果未能解决你的问题,请参考以下文章

40.少有人知的 Python“重试机制”

RocketMQ---RocketMQ重试机制

Python 这些“重试机制“你都知道吗?

spring-retry重试机制使用

php-resque 简单的php消息队列

PHP的轻量消息队列php-resque使用说明