消息队列(MQ)消息延迟及过滤设计方案

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列(MQ)消息延迟及过滤设计方案相关的知识,希望对你有一定的参考价值。

参考技术A

消息队列接收到请求后,会将消息顺序写入Physic log文件,对于延迟消息,将消息按照将要投递的时间,以小时为单位异步分割存储,每个小时的消息顺序写入延迟物理文件delay log,并把索引存储在delay index文件,索引记录消息在delay log中的offset,size,投递时间戳元信息,基于有限内存以及延迟消息分发特性,我们仅将最近两个小时的delay log文件序列采用mmap内存映射机制进行读写,延迟2个小时以上的消息直接写入磁盘文件。

但消息分发时,每次需要将一个小时的的索引文件,全部加载到内存,由于每个小时的消息索引是顺序写入delay index的,而消息分发投递时间又是随机的,写入顺序与消息投递顺序并不一致,所以,索引加载到内存后,需要按照消息具体投递的秒级时间戳进行排序,再根据排序后的索引读取delay log中的消息进行分发投递。

这种存储方案有以下问题: 1.一次需要加载整个小时的消息索引到内存,若并发比较高,内存压力比较大。 2.按照消息投递秒级时间戳进行排序后,实时到来的新的消息,需要实时插入排序,性能较低,延迟大。

为了解决上述问题,我们将delay index中索引元信息offset, size, 投递时间戳改为offset, size, localIndex, globalIndex, preGlobalIndex ,其中:

通过globalIndex可以直接定位到delay index中的索引单元,从而确定delay log中的一条消息,而preGlobalIndex又可以定位到同一秒内的上一条消息,因此只要落地存储每个小时,每秒最后一条消息的索引ID,即可逆序查出每秒所有消息。一个小时内只有3600秒,只需要将3600个16字节的索引ID加载到内存,即可实现每秒消息的实时加载。

为了降低消息分发延迟,可将最近10s的消息索引提前预加载到内存,对于实时接收到的消息,根据时间戳匹配到对应的秒,更新这一秒最新一条消息的索引globalIndex与逆向索引preGlobalIndex,不需要做排序,消息插入与读取的复杂度都为O(1)。

采用的由数组加链表实现的多级时间轮机制,分别是秒级和小时级,小时级时间轮前移一个槽,对应秒级时间轮旋转一圈,秒级时间轮上一共3600个槽,每个槽的时间跨度最大为1s,时间轮每秒前移一个槽。小时级实践论每个槽时间跨度是1小时,每小时移动一个槽,将后面两个小时的delay log开启内存映射,同时清除两个小时之前delay log文件内存映射。

当我们只有一个2小时5分钟的消息发送时,秒级时间轮需要推动2圈后即小时级时间轮移动2个槽,剩5分钟的延迟,再降级到秒级时间轮。这叫造成了时间轮的空转。

一般会把每个使用到的槽都会放到DelayQueue中,然后根据DelayQueue来 协助时间轮的推进 ,防止空推进的情况。例如,当有延迟500s的任务时,除了挂载到时间轮外,我们还会把其放到DelayQueue中,这样DelayQueue的头结点为延迟500s,如果期间没有小于500s的延迟任务再加进来时,我们只需要等待500s,时间轮推进一次即可。如果有小于500s的定时任务新加进来,我们只需要唤醒DelayQueue,重新计算等待时间即可。

即当有定时任务新增时,如果对应槽为新槽(即新增任务为该槽的第一个任务),在DelayQueue中增加延迟任务,并判断是否为头结点,是的话唤醒DelayQueue重新计算等待时间。

当master发生漂移或者网络异常时,时间轮分发控制需要从原master节点切换到新的master节点。为了保证分发状态的连续性与一致性,master节点定时每隔50ms分别将两个时间轮上分发的tick信息同步到其它slave节点。通过tick可确定具体分发到第几秒,但不能确定分发到这一秒的第几条消息,为此二级时间轮增加同步了一个参数localIndex,记录当前秒分发到第几条消息,并且每个节点都会定时将分发状态持久化。

每当master发生切换时,原master节点切换为slave,会立即停止当前时间轮的分发任务,并清空分发状态;而新的master节点根据当前已同步过来的分发状态初始化两级时间轮,但master切换会有一定的延迟周期或者极端情况下不同节点间时钟存在偏差,新的master初始完时间轮的tick后,该tick对应的秒级时间戳有可能与节点实际时间不一致,启动分发任务前需要做特殊调整,若tick时间戳小于当前时间,则分发任务sleep等待直至时间对齐,若tick时间戳大于当前时间,说明存在已到期的消息未分发,此时连续推进tick迁移,并对到期消息直接异步投递,直到tick对应时间戳小于当前时间。

正常master切换分为两种情况,一种为主动释放master,如节点重启与master负载均衡过程,这种情况节点在drop master之前,会首先同步时间轮分发状态到其它slave节点,此时master切换时间轮分发时完全连续一致的;另一种是一些异常情况下master被动漂移,此时新的master节点上时间轮分发状态可能存在最大50ms的延迟,会出现部分消息重复分发现象。把时间轮分发状态信息封装到到期投递消息协议扩展字段中,paxos请求同步消息时携带时间轮状态,即可做到实时同步。

众所周知,RocketMQ是支持消息过滤的,即发送消息时,可以给消息设置一个TAG。订阅主题的时候,可以设置只消费携带某些TAG的消息,起到消息过滤的作用。

客户端拉取消息时,在服务端得到tag的hash集合codeSet,然后从ConsumerQueue获取一条记录,判断记录的hashCode是否在codeSet中,以达到消息过滤的目的,决定是否将该消息发送给consumer。

因为Hash存在冲突,过滤不完全准确,所以,客户端收到消息后,会进行再次精准过滤。

还有一种过滤方式,把TAG通过哈希转换为long,索引中保存所有TAG的哈希值按位或的结果。当拉取消息时,通过订阅设置的TAG哈希值与索引中的哈希值进行按位与操作,如果结果等于订阅设置的TAG哈希值,说明该索引对应的消息可能符合条件,二次精准过滤依旧在客户端来做;否则,一定不符合条件,直接过滤掉。

基于redis的延迟消息队列设计

需求背景

  • 用户下订单成功之后隔20分钟给用户发送上门服务通知短信
  • 订单完成一个小时之后通知用户对上门服务进行评价
  • 业务执行失败之后隔10分钟重试一次

    类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理。

队列设计

目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件。

开发前需要考虑的问题?

  • 及时性 消费端能按时收到
  • 同一时间消息的消费权重
  • 可靠性 消息不能出现没有被消费掉的情况
  • 可恢复 假如有其他情况 导致消息系统不可用了 至少能保证数据可以恢复
  • 可撤回 因为是延迟消息 没有到执行时间的消息支持可以取消消费
  • 高可用 多实例 这里指HA/主备模式并不是多实例同时一起工作
  • 消费端如何消费

    当然初步选用redis作为数据缓存的主要原因是因为redis自身支持zset的数据结构(score 延迟时间毫秒) 这样就少了排序的烦恼而且性能还很高,正好我们的需求就是按时间维度去判定执行的顺序 同时也支持map list数据结构。

简单定义一个消息数据结构

 
private String topic;/***topic**/
private String id;/***自动生成 全局惟一 snowflake**/
private String bizKey;
private long delay;/***延时毫秒数**/
private int priority;//优先级
private long ttl;/**消费端消费的ttl**/
private String body;/***消息体**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();

运行原理:

  1. Map来存储元数据。id作为key,整个消息结构序列化(json/…)之后作为value,放入元消息池中。
  2. id放入其中(有N个)一个zset有序列表中,以createTime+delay+priority作为score。修改状态为正在延迟中
  3. 使用timer实时监控zset有序列表中top 10的数据 。 如果数据score<=当前时间毫秒就取出来,根据topic重新放入一个新的可消费列表(list)中,在zset中删除已经取出来的数据,并修改状态为待消费
  4. 客户端获取数据只需要从可消费队列中获取就可以了。并且状态必须为待消费 运行时间需要<=当前时间的 如果不满足 重新放入zset列表中,修改状态为正在延迟。如果满足修改状态为已消费。或者直接删除元数据。

客户端

因为涉及到不同程序语言的问题,所以当前默认支持http访问方式。

  1. 添加延时消息添加成功之后返回消费唯一ID POST /push {…..消息体}
  2. 删除延时消息 需要传递消息ID GET /delete?id=
  3. 恢复延时消息 GET /reStore?expire=true|false expire是否恢复已过期未执行的消息。
  4. 恢复单个延时消息 需要传递消息ID GET /reStore/id
  5. 获取消息 需要长连接 GET /get/topic

用nginx暴露服务,配置为轮询 在添加延迟消息的时候就可以流量平均分配。

目前系统中客户端并没有采用HTTP长连接的方式来消费消息,而是采用MQ的方式来消费数据这样客户端就可以不用关心延迟消息队列。只需要在发送MQ的时候拦截一下 如果是延迟消息就用延迟消息系统处理。

消息可恢复

实现恢复的原理 正常情况下一般都是记录日志,比如mysqlbinlog等。

这里我们直接采用mysql数据库作为记录日志。

目前打算创建以下2张表:

  1. 消息表 字段包括整个消息体
  2. 消息流转表 字段包括消息ID、变更状态、变更时间、zset扫描线程Name、host/ip

定义zset扫描线程Name是为了更清楚的看到消息被分发到具体哪个zset中。前提是zset的key和监控zset的线程名称要有点关系 这里也可以是zset key。

举个栗子

假如redis服务器宕机了,重启之后发现数据也没有了。所以这个恢复是很有必要的,只需要从表1也就是消息表中把消息状态不等于已消费的数据全部重新分发到延迟队列中去,然后同步一下状态就可以了。

当然恢复单个任务也可以这么干。

关于高可用

分布式协调还是选用zookeeper吧。

如果有多个实例最多同时只能有1个实例工作 这样就避免了分布式竞争锁带来的坏处,当然如果业务需要多个实例同时工作也是支持的,也就是一个消息最多只能有1个实例处理,可以选用zookeeper或者redis就能实现分布式锁了。

最终做了一下测试多实例同时运行,可能因为会涉及到锁的问题性能有所下降,反而单机效果很好。所以比较推荐基于docker的主备部署模式。

扩展

支持zset队列个数可配置 避免大数据带来高延迟的问题。

目前存在日志和redis元数据有可能不一致的问题 如mysql挂了,写日志不会成功。

设计图:

 

另外分享一个不完整简陋的开源版本  https://github.com/peachyy/sdmq.git  后期会进行模块拆分 优化



欢迎关注我的微信公众号&lt;笑笑笑技术圈&gt; 我会不定期发布一些不限于技术的文章

 

以上是关于消息队列(MQ)消息延迟及过滤设计方案的主要内容,如果未能解决你的问题,请参考以下文章

MQ-死信队列实现消息延迟

RabbitMQ之消息可靠性死信交换机惰性队列及集群

RabbitMQ之消息可靠性死信交换机惰性队列及集群

扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题

MQ问题及解决方案

基于redis的延迟消息队列设计