TP5 redis 延迟队列
Posted 野香蕉
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TP5 redis 延迟队列相关的知识,希望对你有一定的参考价值。
TP5.1 下载安装Redis
配置redis信息
<?php
namespace app\\common\\redis;
class RedisHandler
{
public $provider;
//创建实例子
private static $_instance = null;
//创建redis实例子
private function __construct()
{
$this->provider = new \\Redis();
$this->provider->connect(config(\'redis.redis_host\'),config(\'redis.redis_port\'));
}
final private function __clone()
{
}
public static function getInstance(){
if(!self::$_instance){
self::$_instance = new RedisHandler();
}
return self::$_instance;
}
/**
* @param string $key 有序集key
* @param number $score 排序值
* @param string $value 格式化的数据
* @return int
*/
public function zAdd($key,$score,$value){
return $this->provider->zAdd($key,$score,$value);
}
/**
* 获取有序集合数据
* @param $key
* @param $start
* @param $end
* @param null $withsscores
* @param array
*/
public function zRange($key,$start,$end,$withsscores = null){
return $this->provider->zRange($key,$start,$end,$withsscores);
}
/**
* 删除有序集合
* @param $key
* @param $member
* @param int
*/
public function zRem($key,$member){
return $this->provider->zRem($key,$member);
}
}
创建一个命令
目录为
namespace app\\command;
<?php
namespace app\\command;
use app\\common\\delayqueue\\DelayQueue;
use think\\console\\Command;
use think\\console\\Input;
use think\\console\\Output;
class DelayQueueWorker extends Command
{
const COMMAND_ARGV_1 = \'queue\';
protected function configure()
{
$this->setName(\'delay-queue\')->setDescription(\'延迟队列任务进程\');
$this->addArgument(self::COMMAND_ARGV_1);
}
protected function execute(Input $input, Output $output)
{
$queue = $input->getArgument(self::COMMAND_ARGV_1);
//参数1 延迟队列表名,对应与redis的有序集key名
//这边是使用while死循环 来监听
while (true) {
echo $queue."===".time()."\\n";
DelayQueue::getInstance($queue)->perform();
usleep(1000000);
}
}
}
队列创建方法
<?php
namespace app\\common\\delayqueue;
use app\\common\\redis\\RedisHandler;
class DelayQueue
{
//默认参数
private $prefix = "delay_queue:";
private $queue;
//创建一个实例
private static $_instance = null;
//初始化
private function __construct($queue)
{
$this->queue = $queue;
}
//最终类不允许被克隆
final private function __clone()
{
}
//创建工厂方法
public static function getInstance($queue=\'\'){
if(!self::$_instance){
self::$_instance = new DelayQueue($queue);
}
return self::$_instance;
}
/**
*@Description:
*@MethodAuthor: lijian
*@Date: 2021-06-12 11:44:12
*@param:
*@return:
*/
public function addTask($jobClass,$runTime,$args = null){
$key = $this->prefix.$this->queue;
//导入数据到redis中
$params = [
\'class\'=> $jobClass,
\'args\' => $args,
\'runtime\'=>$runTime
];
RedisHandler::getInstance()->zAdd(
$key,
$runTime,
serialize($params)
);
}
/**
*@Description: 执行job
*@MethodAuthor: lijian
*@Date: 2021-06-12 11:49:21
*/
public function perform(){
$key = $this->prefix.$this->queue;
//取出有序集合第一个元素
$result = RedisHandler::getInstance()->zRange($key,0,0);
if(!$result) return false;
//序列化
$jobInfo = unserialize($result[\'0\']);
print_r(\'job: \'.$jobInfo[\'class\'].\' will run at: \'. date(\'Y-m-d H:i:s\',$jobInfo[\'runtime\']).PHP_EOL);
$jobClass = $jobInfo[\'class\'];
if(!@class_exists($jobClass)){
print_r($jobClass.\'undefined\',PHP_EOL);
RedisHandler::getInstance()->zRem($key,$result[0]);
return false;
}
//到时间执行
if(time() >= $jobInfo[\'runtime\']){
$job = new $jobClass;
//获取订单号
$job->setPayload($jobInfo[\'args\']);
$jobResult = $job->perform();
if($jobResult){
//将任务移除
RedisHandler::getInstance()->zRem($key,$result[0]);
return true;
}
}
return false;
}
}
检验数据
DelayQueue::getInstance(\'delay_job\')->addTask(
\'app\\common\\delayqueue\\CloseOrder\', // 自己实现的job
time()+50, // 订单失效时间
[\'order_id\'=>123456] // 传递给job的参数
);
以上是关于TP5 redis 延迟队列的主要内容,如果未能解决你的问题,请参考以下文章