基于Redisson实现延迟队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Redisson实现延迟队列相关的知识,希望对你有一定的参考价值。

参考技术A

假设有这样一个场景,我们有一个订单,或者工单等等。需要在超时30分钟后进行关闭。这个时候我们最先想到的应该是采用定时任务去进行轮训判断,但是呢,每个订单的创建时间是不一样的,这个时间怎么确定才好呢,5分钟。。1分钟。。执行一次吗。这样就会非常影响性能。且时间误差很大。基于以上业务需要我们想到了有以下解决方案。

我们首先来回顾下JDK的延迟队列

基于延迟队列要实现接口 Delayed ,并且实现 getDelay 方法和 compareTo 方法

订单的实体,为了简单就定义基础几个字段。

为了简单我们暂且定义延迟时间为10s

输出结果

2022-07-01T15:00
当前时间:2022-07-01T15:10:37.375

当然今天的主角是它了,我们主要围绕着基于Redisson的延迟队列来说。

其实Redisson延迟队列内部也是基于redis来实现的,我们先来进行整合使用看看效果。基于springboot

1.依赖:

2.创建redisson.yml

3.创建配置类RedissonConfig,这里是为了读取我们刚刚创建在配置文件中的yml

4.测试

控制台输出:

订单生成时间2022-07-01T15:22:10.304
订单关闭时间2022-07-01T15:22:20.414

我们首先来了解两个API

那么为什么会涉及到两个队列呢,这两个队列到底有什么用呢?

首先我们实际操作的是RBlockingQueue阻塞队列,并不是RDelayedQueue队列,RDelayedQueue对接主要是提供中间转发的一个队列,类似中间商的意思

画个小图理解下

这里不难看出我们都是基于 RBlockingQueue 目标队列在进行消费,而 RDelayedQueue 就是会把过期的消息放入到我们的目标队列中

我们只要从 RBlockingQueue 队列中取数据即可。

好像还是不够深入,我们接着看。我们知道 Redisson 是基于redis来实现的那么我们看看里面到底做了什么事

打开redis客户端,执行monitor命令,看下在执行上面订单操作时redis到底执行了哪些命令

monitor命令可以看到操作redis时执行了什么命令

这里参考: https://zhuanlan.zhihu.com/p/343811173

我们知道Zset是按照分数升序的也就是最小的分数在最前面,基于这个特点,大致明白,利用过期时间的时间戳作为分数放入到Zset中,那么即将过期的就在最上面。

直接上个图解

灵感来袭,基于Redis的分布式延迟队列

延迟队列

延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。

Redisson Delayed Queue

如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

 

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

Java DelayQueue

DelayQueue它本质上是一个队列,而这个队列里也只有存放Delayed的子类才有意义。

延迟队列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }


    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue<DelayTask> queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延迟队列消费原理

源码中出现了三次await字眼:
  • 第一次是当队列为空时,等待;
  • 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader),那不好意思,还得接续等待直到下一个可执行的任务。
  • 第三次是真正延时的地方了,available.awaitNanos(delay),此时也没有别的线程要执行,也就是我将要执行,等待剩下的延迟时间即可。

延迟队列生产原理

为保证消费者正常消费,如果优先队列头元素和当前放入元素相等,则说明当前元素消费的优先级高,重置准备消费的线程(leader)为null,唤醒消费者线程重新执行take方法逻辑。

手写一个Redis延迟队列

Redis延迟队列设计

延迟消息体设计

延迟消息体Message实现了Delayed接口,这样Java DelayQueue就知道什么时候取出消息体。

Redis延迟队列实现

RedisDelayQueue构造函数依赖redis操作缓存服务对象目标队列名称(redis key)。

offer方法传入member(具体消息),delay(延迟时间),timeUnit(时间单位),然后封装成延迟消息体Message对象,放入Java DelayQueue中。

run方法是一个循环体,不断的从Java DelayQueue对象中获取消息体,然后放入redis对应的目标队列里。

延迟队列测试demo

控制台打印效果

思考

这种方案实现比较简单,使用的时候一定要谨慎,应用于延迟小,消息量不大的场景是没问题的,毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列【灵感来袭,基于Redis的分布式延迟队列(续)】,使用TimeStamp作为score,比如你的任务是要延迟5分钟,那么就在当前时间上加5分钟作为 score ,轮询任务每秒只轮询 score 小于等于 当前时间的 key即可,如果任务支持有误差,那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。

以上是关于基于Redisson实现延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

综合中间件Redisson实战

基于消息队列(RabbitMQ)实现延迟任务

基于php和redis实现的延迟队列

DelayQueue延迟队列-实现缓存

基于消息队列(RabbitMQ)实现延迟任务

rabbitmq死信队列及延迟队列