TP5 redis 延迟队列

Posted 野香蕉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TP5 redis 延迟队列相关的知识,希望对你有一定的参考价值。

TP5.1 下载安装Redis

配置redis信息

<?php
namespace app\\common\\redis;

class RedisHandler
{
    public $provider;

    //创建实例子
    private static $_instance = null;
    //创建redis实例子
    private function __construct()
    {
        $this->provider = new \\Redis();

        $this->provider->connect(config(\'redis.redis_host\'),config(\'redis.redis_port\'));
    }

    final private function __clone()
    {
        
    }

    public static function getInstance(){
        if(!self::$_instance){
            self::$_instance = new RedisHandler();
        }
        return self::$_instance;
    }

    /**
     * @param string $key 有序集key
     * @param number $score 排序值
     * @param string $value 格式化的数据
     * @return int
     */
    public function zAdd($key,$score,$value){
        return $this->provider->zAdd($key,$score,$value);
    }

    /**
     * 获取有序集合数据
     * @param $key
     * @param $start
     * @param $end
     * @param null $withsscores
     * @param array
     */
    public function zRange($key,$start,$end,$withsscores = null){
        return $this->provider->zRange($key,$start,$end,$withsscores);
    }

    /**
     * 删除有序集合
     * @param $key
     * @param $member
     * @param int
     */
    public function zRem($key,$member){
        return $this->provider->zRem($key,$member);
    }
}

创建一个命令

目录为
namespace app\\command;

<?php
namespace app\\command;

use app\\common\\delayqueue\\DelayQueue;

use think\\console\\Command;

use think\\console\\Input;

use think\\console\\Output;

 

class DelayQueueWorker extends Command

{

    const COMMAND_ARGV_1 = \'queue\';

 

    protected function configure()

    {

        $this->setName(\'delay-queue\')->setDescription(\'延迟队列任务进程\');

        $this->addArgument(self::COMMAND_ARGV_1);

    }

 

    protected function execute(Input $input, Output $output)

    {

        $queue = $input->getArgument(self::COMMAND_ARGV_1);

        //参数1 延迟队列表名,对应与redis的有序集key名
        //这边是使用while死循环 来监听 
        while (true) {
            echo $queue."===".time()."\\n";

            DelayQueue::getInstance($queue)->perform();

            usleep(1000000);

        }

    }

}

队列创建方法

<?php
namespace app\\common\\delayqueue;

use app\\common\\redis\\RedisHandler;

class DelayQueue
{
    //默认参数
    private $prefix = "delay_queue:";

    private $queue;
    //创建一个实例
    private static $_instance = null;
    //初始化
    private function __construct($queue)
    {
        $this->queue = $queue;   
    }

    //最终类不允许被克隆
    final private function __clone()
    {
        
    }

    //创建工厂方法
    public static function getInstance($queue=\'\'){
        if(!self::$_instance){
            self::$_instance = new DelayQueue($queue);
        }
        return self::$_instance;
    }

    /**
     *@Description: 
     *@MethodAuthor: lijian
     *@Date: 2021-06-12 11:44:12
     *@param:
     *@return:
    */
    public function addTask($jobClass,$runTime,$args = null){
        
        $key = $this->prefix.$this->queue;
        //导入数据到redis中
        $params = [
            \'class\'=> $jobClass,
            \'args\' => $args,
            \'runtime\'=>$runTime
        ];

        RedisHandler::getInstance()->zAdd(
            $key,
            $runTime,
            serialize($params)
        );
    }

    /**
     *@Description: 执行job
     *@MethodAuthor: lijian
     *@Date: 2021-06-12 11:49:21
    */
    public function perform(){
        
        $key = $this->prefix.$this->queue;

        //取出有序集合第一个元素
        $result = RedisHandler::getInstance()->zRange($key,0,0);

        if(!$result) return false;
        //序列化
        $jobInfo = unserialize($result[\'0\']);

        print_r(\'job: \'.$jobInfo[\'class\'].\' will run at: \'. date(\'Y-m-d H:i:s\',$jobInfo[\'runtime\']).PHP_EOL);

        $jobClass = $jobInfo[\'class\'];

        if(!@class_exists($jobClass)){
            print_r($jobClass.\'undefined\',PHP_EOL);
            RedisHandler::getInstance()->zRem($key,$result[0]);

            return false;
        }

        //到时间执行
        if(time() >= $jobInfo[\'runtime\']){
            $job = new $jobClass;
            //获取订单号
            $job->setPayload($jobInfo[\'args\']);
            $jobResult = $job->perform();
            if($jobResult){
                //将任务移除
                RedisHandler::getInstance()->zRem($key,$result[0]);
                return true;
            }
        }
        return false;
    }
}

检验数据

    DelayQueue::getInstance(\'delay_job\')->addTask(

        \'app\\common\\delayqueue\\CloseOrder\', // 自己实现的job
   
        time()+50, // 订单失效时间
   
        [\'order_id\'=>123456] // 传递给job的参数
   
    );

以上是关于TP5 redis 延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

# Java 常用代码片段

灵感来袭,基于Redis的分布式延迟队列

基于php和redis实现的延迟队列

通过redis的有序集合[zset] 实现延迟队列

基于Redisson实现延迟队列

Redis实现延迟队列方法介绍