延时队列:基于 Redis 的实现,看完直呼精彩!

Posted king哥Java架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了延时队列:基于 Redis 的实现,看完直呼精彩!相关的知识,希望对你有一定的参考价值。

前段时间做一个项目,需要各种定时任务处理会话状态,处理订单状态,然后需求不停的变,修修改改就觉得很麻烦,就去了解了一下有没有什么便捷的方式取代繁琐的定时任务,于是就找到了延迟队列的这种实现方式。

一、应用场景:

  • 订单超过 30 分钟未支付,则自动取消。

  • 外卖商家超时未接单,则自动取消。

  • 医生抢单电话点诊,超过 30 分钟未打电话,则自动退款。

    等等场景都可以用定时任务去轮询实现,但是当数据量过大的时候,高频轮询数据库会消耗大量的资源,此时用延迟队列来应对这类场景比较好。

二、需求

  • 消息存储

  • 过期延时消息实时获取

  • 高可用性

三、为什么使用 Redis 实现?

3.1、Rabbitmq 延时队列

  • 优点:消息持久化,分布式

  • 缺点:延时相同的消息必须扔在同一个队列,每一种延时就需要建立一个队列。因为当后面的消息比前面的消息先过期,还是只能等待前面的消息过期,这里的过期检测是惰性的。

  • 使用: RabbitMQ 可以针对 Queue 设置 x-expires 或者针对 Message 设置 x-message-ttl ,来控制消息的生存时间(可以根据 Queue 来设置,也可以根据 message 设置), Queue 还可以配置

    x-dead-letter-exchange 和 x-dead-letter-routing-key(可选)两个参数,

    如果队列内出现了 dead letter ,则按照这两个参数重新路由转发到指定的队列,此时就可以实现延时队列了。

3.2、DelayQueue 延时队列

  • 优点:无界、延迟、阻塞队列

  • 缺点:非持久化

  • 介绍:JDK 自带的延时队列,没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS) 方法的返回值小于等于0来判断,并且不能存放空元素。

  • 使用:getDelay 方法定义了剩余到期时间,compareTo 方法定义了元素排序规则。poll() 是非阻塞的获取数据,take() 是阻塞形式获取数据。实现 Delayed 接口即可使用延时队列。

  • 注意:DelayQueue 实现了 Iterator 接口,但 iterator() 遍历顺序不保证是元素的实际存放顺序。

/\\*\\*  
 \\* 实现 Delayed 定义延时队列  
 \\*/  
@Data  
@NoArgsConstructor  
@AllArgsConstructor  
public class Sequence implements Delayed {  
  
    private Long time;  
    private String name;  
  
    @Override  
    public long getDelay(TimeUnit unit) {  
        return time \\- System.currentTimeMillis();  
    }  
  
    @Override  
    public int compareTo(Delayed o) {  
        if (this.getDelay(TimeUnit.MILLISECONDS) \\> o.getDelay(TimeUnit.MILLISECONDS)) {  
            return 1;  
        } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {  
            return \\-1;  
        } else {  
            return 0;  
        }  
    }  
}

3.3、Scala 的 Await & Future

  • 优点:消息实时性

  • 缺点:非持久化

  • 介绍:Scala 的 ExecutionContext 中使用

    Await 的 result(awaitable: Awaitable[T], atMost: Duration)

    方法可以根据传入的 atMost 间隔时间异步执行 awaitable。

import scala.concurrent.ExecutionContext.Implicits.global  
import scala.concurrent.{Await, Future}  
object test extends App {  
    val task \\= Future{ doSomething() }  
    Await.result(task, 5 seconds)  
}

3.4、Redis 延迟队列

  • 消息持久化,消息至少被消费一次

  • 实时性:存在一定的时间误差(定时任务间隔)

  • 支持指定消息 remove

  • 高可用性

  • Redis 的特殊数据结构 ZSet 满足延迟的特性

四、Redis 的使用

4.1、使用 sortedset 操作元素

  • 赋值:zadd key score1 value1 score2 value2… (把全部的元素添加到sorted set中,并且每个元素有其对应的分数,返回值是新增的元素个数。)

  • 获取元素:

  • zscore key value:返回指定成员的分数

  • zcard key : 获取集合中的成员数量

  • 删除元素:zrem key value1 value2 … 删除指定元素

  • zremrangebyrank key start stop:按照排名范围删除元素。

  • zremrangebyscore key min max:按照分数范围删除元素。

  • 查询元素:

  • zrange key start end withscores:查询start到end之间的成员。

  • zrevrange key start end withscores:查询成员分数从大到小顺序的索引 start 到 end 的所有成员。

  • zrangebyscore key min max withscores limit offset count:返回分数 min 到 max 的成员并按照分数从小到大排序, limit 是从 offset 开始展示几个元素。

4.2、Redis 实现方式

使用sortedset,用时间戳作为score,使用zadd key score1 value1
命令生产消息,使用zrangebysocre key min max withscores limit 0 1消费消息最早的一条消息。

这里选用 Redis 主要的原因就是其支持高性能的 score 排序,同时 Redis 的持久化 bgsave 特性,保证了消息的消费和存贮问题。bgsave 的原理是 fork 和 cow。fork 是指 Redis 通过创建子进程来进行 bgsave 操作, cow 指的是copy on write, 子进程创建后, 父进程通过共享数据段, 父进程继续提供读写服务, 写脏的页面数据会逐渐和子进程分离开来。

4.3、ACK

队列最重要的就是保证消息被成功消费,这里也不可避免的需要考虑这个问题。

如果你觉得自己学习效率低,缺乏正确的指导,可以加入资源丰富,学习氛围浓厚的技术圈一起学习交流吧!
[Java架构群]
群内有许多来自一线的技术大牛,也有在小厂或外包公司奋斗的码农,我们致力打造一个平等,高质量的JAVA交流圈子,不一定能短期就让每个人的技术突飞猛进,但从长远来说,眼光,格局,长远发展的方向才是最重要的。

  • RabbitMQ 的 ACK机制:

  • Publisher 把消息通知给 Consumer,如果 Consumer 已处理完任务,那么它将向 Broker 发送 ACK 消息,告知某条消息已被成功处理,可以从队列中移除。如果 Consumer 没有发送回 ACK 消息,那么 Broker 会认为消息处理失败,会将此消息及后续消息分发给其他 Consumer 进行处理 ( redeliver flag 置为 true )。

  • 这种确认机制和 TCP/IP 协议确立连接类似。不同的是,TCP/IP 确立连接需要经过三次握手,而 RabbitMQ 只需要一次 ACK。

  • 还有一个重要的是,RabbitMQ 当且仅当检测到 ACK 消息未发出且 Consumer 的连接终止时才会将消息重新分发给其他 Consumer ,因此不需要担心消息处理时间过长而被重新分发的情况。

  • Redis 实现 ACK

  • 需要在业务代码中处理消息失败的情况,回滚消息到原始等待队列。

  • Consumer 挂掉,仍然需要回滚消息到等待队列中。

前者只需要在业务中处理消费异常的情况,后者则需要维护两个队列。

  • Redis ACK 实现方案

  • 维护一个消息记录表,存贮消息的消费记录,用于失败时回滚消息。表中记录消息ID、消息内容、消息时间、消息状态。

  • 定时任务轮询该消息表,处理消费记录表中消费状态未成功的记录,重新放入等待队列。

4.4、多实例问题

多实例是指同一个服务部署在不同的地方,发挥相同的作用,此时就会导致同时消费同一个消息的问题。

一般情况下解决此类问题就需要考虑接入外部应用的辅助。常见的分布式锁的方案有:基于数据库实现分布式锁、基于缓存实现分布式锁、基于 Zookeeper 实现分布式锁,这里使用缓存也就是 Redis 解决问题。

  • 利用 Redis 的 setnx 的互斥特性,把 key 当作锁存在 Redis 中,但是用 setnx 需要解决死锁和正确解锁的问题。

  • 死锁:设置 key-value 的过期时间,并且使用 lua 脚本保证加锁和设置过期时间的原子性。

  • 解锁:解锁需要保证是加锁客户端进行解锁操作。将 value 设置为 UUID,用对应的 UUID 去解锁保证是加锁客户端进行对应的解锁操作。

  • 利用 Redis 的 List 实现一个 Publisher 推送消费保证只被消费一次,这种不用考虑死锁问题,但是需要额外维护一个队列。

最后

为粉丝准备了超级干货内部独家教材~需要的小伙伴可以在文末获取免费领取方式!

《Redis底层原理及架构实战笔记及高频面试解析》:

《2021 Java大厂面试解析+后端进阶完整笔记》

在这里插入图片描述

图片

需要的小伙伴,可以一键三连,下方获取免费领取方式!
在这里插入图片描述

以上是关于延时队列:基于 Redis 的实现,看完直呼精彩!的主要内容,如果未能解决你的问题,请参考以下文章

java分层架构,看完直呼内行

javagui程序开发工具,看完直呼内行

java并发面试题,看完直呼内行

倾力推荐我的几个朋友,看完直呼牛逼!

Java多进程从头讲到尾,看完直呼内行

PAT乙级满分冲刺,每日十题计划,看完直呼内行!!