Lavavel5.5源代码 - RedisQueue是怎么实现
Posted ~~逍遥~~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lavavel5.5源代码 - RedisQueue是怎么实现相关的知识,希望对你有一定的参考价值。
队列的基本功能:
1、立即执行;yes
2、延迟执行;yes
3、保证至少执行一次;yes
4、必须执行且最多执行一次;no
用到的数据结构:
list、Sorted sets
延迟执行的机制:
1、先把数据放入SortedSets类型的queues:queue_000:delayed中
2、在执行pop的时候,执行lua脚本,把SortedSets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中
保证执行成功的机制:
1、把要执行的数据先放入SortedSets类型的queues:queue_000:reserved中
2、在执行pop的时候,执行lua脚本,把SortedSets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中
3、任务执行成功,从SortedSets类型的queues:queue_000:reserved中执行删除预存的数据
class RedisQueue extends Queue implements QueueContract { public function pushRaw($payload, $queue = null, array $options = []) { $this->getConnection()->rpush( $this->getQueue($queue), // list类型的queues:queue_000 $payload // $payload === "标准化后的数据,进行json格式化"的数据 ); return json_decode($payload, true)[‘id‘] ?? null; } protected function laterRaw($delay, $payload, $queue = null) { $this->getConnection()->zadd( $this->getQueue($queue).‘:delayed‘, // SortedSets类型的queues:queue_000:delayed $this->availableAt($delay), // 延迟执行 $payload // $payload === "标准化后的数据,进行json格式化"的数据 ); return json_decode($payload, true)[‘id‘] ?? null; } public function pop($queue = null) { // 执行lua脚本,把SortedSets类型的queues:queue_000:delayed 中可以执行的数据rpush到list类型的queues:queue_000中 // 执行lua脚本,把SortedSets类型的queues:queue_000:reserved 中可以执行的数据rpush到list类型的queues:queue_000中 $this->migrate($prefixed = $this->getQueue($queue)); // 执行lua脚本,从list类型的queues:queue_000中lpop出数据,attempts加1,然后设定超时时间并放入结构把SortedSets类型的queues:queue_000:reserved 中 list($job, $reserved) = $this->retrieveNextJob($prefixed); if ($reserved) { return new RedisJob( $this->container, $this, $job, $reserved, $this->connectionName, $queue ?: $this->default ); } } }
以上是关于Lavavel5.5源代码 - RedisQueue是怎么实现的主要内容,如果未能解决你的问题,请参考以下文章
Lavavel5.5源代码 - RedisQueue是怎么实现
Lavavel5.5源代码 - RedisQueue是怎么实现