如何实现延迟消息

Posted 贤鱼可不是咸鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何实现延迟消息相关的知识,希望对你有一定的参考价值。

来到新项目组后,遇到了一些需要延迟执行的业务场景,比如延迟重试发奖、延迟触发。业务通过接入Delay服务延迟消息功能来满足业务场景,比较好奇他们是如何实现的,遂抽时间学习了相关知识和业界成熟产品的实现方案,分享给大家。

引言

客户端生产者发送消息到消息队列中以后,不期望消息被立即消费,而是等待指定时间后或某个时间点后才对消费者可见,这类消息通常被称为延迟消息。
在业务中延迟消费的应用场景十分广泛,如:

  • 延迟消息
    • 订单半小时内未支付,自动关闭订单并释放商品库存
    • 重试场景:资管下游订单状态pending中,延迟10s重试拉取最终状态
  • 定时消息
    • 会议推送、定时提醒、抢票提醒

如何简单实现延迟消息

此刻我们带入一个场景头脑风暴一下,假如我在工位摸鱼看小说🐶🐶🐶,一不小心魂穿到了某外包公司的一个小马仔RD身上,此时公司运维的存储只有基础的mysql、Redis、Kafka,此时领导说客户想实现一个订单定时关闭/延迟重试更新订单状态,让你搞一个延迟队列/消息,且要以最小最快的成本实现,“我”应该怎么做?

下面是“我”与“领导”的对话:

领导
领导领导这个我知道,找个RocketMQ,直接用它延迟消息能力不行,为你这个需求专门运维个MQ不值当的。
那我整个本地内存队列行不,起个ticker一秒跑一次,这不两下就搞定了蠢货 咱们这破机器三天两头重启,在内存中的的延迟任务不都丢了?
那我将消息内容存储在DB表里,依赖mysql做持久化。每次select出所有没完成的任务,起个线程1s轮训呢?任务几千个这一秒轮训还没结束,下一秒的ticker又触发了你怎么办?
那我存个执行时间的字段,根据这个字段倒序排序,跑到大于当前时间我就停止怎么样(得意😏这不解决问题,要是你有大量消息同一秒同时触发你怎么处理,触发任务之后宕机了怎么办?
Emmmmmmmmm 那我可以起多个机器都select数据,每个机器用线程池并发执行任务,执行每个消息前拼接消息msgID setnx 1s,成功的我再去执行,这样我最多delay1s也勉强满足需求哟 还知道分布式 我先不谈你setnx1s是不是合适,如果你写入消QPS一大不是整个DB打崩了,其他业务怎么办?而且mysql有索引机制,量级一大维护excute_time二级索引的开销越来越大,导致写入会越来越慢。
那我换Redis,使用redis集群来支持高并发,而且ZSET是通过跳跃表来实现的,写时间复杂度为O(logN)。我把消息内容映射成一个msgID,用KV结构存储任务信息。再起一个zset,value用同msgID,score用执行时间戳。每次执行先从zset里取出执行时间<=当前时间的,然后从KV存储中取出对应的任务信息执行,是不是又简单又方便?嗯,redis是基于内存做的性能好很多。可是消息量级上来会引发大key问题,你打算怎么解决?
拆分大key嘛,这题我会!可以搞m个zset,把msgID hash进这m个桶里面。这样就将大key拆分了。同时还可以用多个实例处理不同的zset,加速执行。嗯,虽然还有Key逐出等问题,但勉强可以用了。还能想出其他方案吗?结合我们公司的kafka试试?
我是不是可以起一个单独的topic,用来存放延迟消息,发送延迟消息时将order_time和delay_time塞进消息结构体里。起单独的消费者组来处理延迟延迟消息,每次消费如果now_time<order_time+delay_time就塞回kakfa,反之则执行。kafka天然支持消费者组分布式消费,所以也解决了消费速度问题,起多个partion和多个消费机器就ok拉。这样实现起来很简单诶! 我当时是怎么把你这个呆瓜招进来的?!要是我有几千个1天的延迟任务,我一秒钟要消费+写入多少次,你的机器和kafka是用来干这个的?我还不如让你在内存里无线for循环内存🌚 省的造咱们kafka
那我每秒sleep一下呢?(弱弱的问哎… 那你知道你sleep中机器重启了会发生什么情况吗?如果你offset提交在新消息写入前,会导致大量延迟消息丢了。如果你offset提交在新消息写入后,会导致大量延迟消息写两份,这种比上面好,业务侧可以做幂等给你个提醒吧,目前这个方案的痛点在于每个任务消息的执行时间是乱序的,导致你需要将没到期执行的延迟消息也要commit,不然会阻塞其他消息的消费,同时你还想这个消息不丢,所以你要再写进去对吧?那咱能不能依靠kafka单partition内的有序性来做,所有延迟消息只需要消费一次,然后commit呢?
嗷嗷嗷嗷嗷! 我懂你意思!我起个1天的延迟消息topic,里面只放delay_time=1day的延迟消息,每次取出来一直sleep 1s等他执行再commit是不是就可以了,没被消费者client拉取出来的消息执行时间肯定比拉取出来的晚,所以让他在mq里等着就好!那还有个问题,kafka里有个配置叫max.poll.interval.ms,默认值是5分钟max.poll.interval.ms:如果consumer两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。你这个方案会导致kafka触发consumer rebalance,把你的任务分给其他消费者执行,导致double excute,所以你是不是还要把他调到1天?
哇,还是领导考虑的周到!… 你个蠢蛋,你把max.poll.interval.ms调到1天,其他业务的某一个consumer线上问题了,是不是就导致他们业务出问题了?kafka的各个语言包都支持pause函数,Pause一下就可以避免上述问题
可是这样的话是不是每个延迟interval都需要一个topic?是的,如果不同interval的业务场景很多,就要起很多个topic,会导致资源成倍增加,那就不适用于这个方案了。

在上述的讨论中我们不难得出,一个成熟的延迟消息中间件需要满足持久性分布式可用性、高性能消息存储时延误差小吞吐量多个维度的需求,其中有两个主要难点:

  • 消息排序:实现执行时间的顺序排序。在MQ中消息是要落盘持久化的,排序的逻辑太重了对性能和延迟影响很大,无法兼顾高性能、高吞吐。
  • 持久化:将消息持久化存储,保证消息不丢失。MQ的WAL机制会将过期的日志文件删除,而长延迟消息会导致日志文件长时间无法删除,越积越大。

开源中间件的实现

RocketMQ

RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的企业使用。

Github:https://github.com/apache/rocketmq 实现语言:Java

下面介绍的是RocketMQ的开源版本,不支持任意时间精度,特定的 level例如定时 5s,10s,1m 等。

(但是阿里云服务版本的RocketMQ支持任意时间精度,开源的是个阉割版… 挣钱嘛,不寒颤.jpg

开源RocketMQ支持延迟消息,但是不支持任意秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker在启动时,内部会创建一个内部topic:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。

延迟消息在RocketMQ Broker端的流转如下图所示:

总共有6个步骤:

  1. 修改消息Topic名称和队列信息
    1. 在写入CommitLog之前如果是延迟消息,则先记录消息原始topic和queueID,然后改写消息的Topic为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
  2. 转发消息到延迟主题的CosumeQueue中。
    1. 因为被改写Topic和queueId,所以写入的ConsumeQueue实际上非真正消息应该所属的ConsumeQueue,而是写入到ScheduledConsumeQueue中(这个特定的Queue存放不会被消费)
  3. 延迟服务消费SCHEDULE_TOPIC_XXXX消息。
    1. Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

    2. ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

  4. 将信息重新存储到CommitLog中。
    1. 在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。
  5. 将消息投递到目标Topic中。
    1. 这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。
  6. 消费者消费目标topic中的数据。

回顾RocketMQ的实现方案,它和我们在上一个部分基于Kafka的讨论有些类似,在固定level值的前提下,基于单Queue/partition内的有序性实现,将消息排序这一难点巧妙地解决,不可谓不妙。

它的优点:

  1. 延迟消息存储和普通消息在存储上无差别,提供与普通消息无差异的可用性和持久化保障
  2. 无需依赖外部存储,将不同延迟消息排序变成了固定topic的追加写 O(1)

缺点:

  1. 只支持固定数量的延迟等级,场景受限
  2. ReQueue 全部在一个 Timer 线程中完成,有性能瓶颈

DDMQ

DDMQ 是滴滴出行架构部基于 Apache RocketMQ 构建的消息队列产品。作为分布式消息中间件,DDMQ 为滴滴出行各个业务线提供了低延迟、高并发、高可用、高可靠的消息服务。DDMQ 提供了包括实时消息、延迟消息和事务消息在内的多种消息类型以满足不同的业务需求。 用户通过统一的 Web 控制台和傻瓜式的 SDK 即可轻松接入 DDMQ 生产和消费消息,体验功能丰富、稳定的消息服务。

Github:https://github.com/didi/DDMQ 实现语言:Java

DDMQ在开源RocketMQ外包了一层代理,并开发专门的定时任务模块chronos,使用RocksDB外部存储实现了延迟消息。

上面这张图描述了 Chronos 的总体结构:

简单来说,生产 SDK 通过 PProxy 提供的 sendDelay RPC 将延时消息发送到 PProxy, 然后由 PProxy 将消息生产到 Chronos 固定的内部 topic 上(chronos_inner_xxx)。Chronos 模块再去消费 inner topic 的消息并将消息存储到本地的 RocksDB 里去。基于本地内置的 RocksDB 存储引擎构造一个时间轮服务,会将到期的消息再发送给 PProxy,以供业务方消费或 HTTP 推送给业务方。

先简单介绍一下RocksDB

Rocksdb 是基于Google LevelDB研发的高性能kv持久化存储引擎,以库组件形式嵌入程序中,为大规模分布式应用在ssd上运行提供优化。RocksDB不提供高层级的操作,例如备份、负载均衡、快照等,而是选择提供工具支持将实现交给上层应用。正是这种高度可定制化能力,允许RocksDB对广泛的需求和工作负载场景进行定制。

RocksDB是基于Log Structured-Merge Tree(日志结构合并树)实现的分布式KV存储数据库。

其主要优势在于写入性能。所有键值对数据在内存MemTable(跳表)和磁盘(SSTable)中全部有序存储,RocksDB将所有增删改都变为简单的append操作,依赖常驻线程进行compact来保证数据更新/删除和有序性保证。但顺序写保证了极高的写入性能的同时,会牺牲部分读性能。

流程如下:

  1. Chronos在启动时维护seekTimestamp,即每个服务已经消费到的延迟消息时间戳,用文件存储。
  2. Chronos起多个消费线程消费延迟消息topic,批量写入rocksDB(多线程写入时会加锁)。
  3. PushWorker单线程轮询,master从RocksDB中查询位于seekTimestamp到currentTimestamp的所有消息,将其写入阻塞队列中。
  4. Push线程池不断从阻塞队列里读取到期消息,通过Producer发送给对应topic并更新seekTimeStamp。(backup不发送消息)
  5. DeleteBgWorkder线程定期清理数据库中在seekTimestamp之前的消息。

回顾DDMQ的实现,其核心点在于引入了RocksDB这种兼顾了写入性能和范围查询能力的数据库:

  • 通过RocksDB Key的有序性实现了对到期时间的排序。
  • 通过RocksDB 外部存储实现了长延迟消息的持久化,并且极其适合消息大量写的场景。

另外DDMQ还有很多细节逻辑,比如:Chronos的主从切换,RocksDB的一致性和数据容灾,在这里就不一一阐述了。

QMQ

QMQ是去哪儿网内部广泛使用的消息中间件,自2012年诞生以来在去哪儿网所有业务场景中广泛的应用,包括跟交易息息相关的订单场景; 也包括报价搜索等高吞吐量场景。目前在公司内部日常消息qps在60W左右,生产上承载将近4W+消息topic,消息的端到端延迟可以控制在10ms以内。

Github:https://github.com/qunarcorp/qmq 实现语言:Java

QMQ的延迟队列能力是通过qmq-delay-server延迟组件实现。延迟消息delay_server和普通消息server都会向meta server注册,区别是producer发送延迟消息会将消息发给delay server,消息到期后delay server将其转发给普通消息server中,此时消费者可以消费拉取消息。

Delay server的实现逻辑如下,其核心是两层时间轮

消息的流转共经历4个模块:

  • message log:delay server收到消息后append到log就返回给producer,相当于WAL。
  • schedule log:位于磁盘上的第一层时间轮,按照投递时间组织每个小时一个,即刻度为1h。
    • 该log是回放message log后根据延时时间放置对应的log上。schedule log在回放中会copy完整的消息内容,因为消息内容从message log同步到了schedule log,所以历史message log都可以删除,这就使得message log只需要占用极小的存储空间,所以qmq可以使用低容量高性能的ssd来获取极高的吞吐量。如何用不到两千块大幅度提升QMQ性能
    • 另外,schedule log是按照延时时间组织的,所以延时时间已过的schedule log文件也可以删除,这就大大解决了持久化磁盘占用问题。
  • Hash wheel in memory:位于内存中的第二层时间轮,刻度为500ms,包含了主要运算执行逻辑。
  • dispatch log :延时/定时消息投递成功后写入,主要用于在应用重启后能够确定哪些消息已经投递,dispatch log里写入的是消息的offset,不包含消息内容。当延时server中途重启时,我们需要判断出当前这个刻度(比如一个小时)里的消息有哪些已经投递了则不重复投递。
    先介绍几个基本的组件:
  • timeout队列:阻塞队列,可以看做一个多线程到单线程的协调管道。所有延迟消息从日志中读取时会先置入timeouts队列。
  • wheels数组:模拟时间轮的数组。结构字段包括:消息索引offset、topic、到期时间。

    加载一个时间段内的消息是不是需要占用太多的内存?
    实际上我们并不会将schedule log里完整的消息加载到内存,只会加载索引到内存,根据前面的介绍,每个索引是16个字节(实际大小可以参照代码,略有出入)。假设我们使用1G内存加载一个小时索引的话,则可以装载1G/16B = (1024M * 1024K * 1024B)/(16B) = 67108864 条消息索引。则每秒qps可以达到18641(67108864 / 60 / 60)。如果我们想每秒达到10万qps,每个小时一个刻度则需要5493MB,如果觉得内存占用过高,则可以相应的缩小时间段大小,比如10分钟一个时间段,则10万qps只需要占用915MB内存。通过计算可知这种设计方式还是在合理的范围内的。

工作流:

  1. 消息拉取
    1. 将schedule log从日志中拉取(会提前load),将消息写入timeout阻塞队列(链表)。
      2. 在处理message log时发现该消息执行时间是当前时间段内,也写入timeout阻塞队列。

      正在加载某个时间段内的消息过程中又来了属于该时间段内消息如何处理,会不会重复加载?

      在我们决定加载某个时间段消息时(正在加载的时间段称之为current loading segment),我们首先会取得该时间段文件的最大offset,然后加载只会加载这个offset范围内的消息(qmq内称之为loading offset),而加载过程中如果又来了该时间段内消息,那这个消息的offset也是>loading offset:

    2. 循环触发时间轮,其执行逻辑伪代码如下:
for 
    //计算到达下一个tick需要的时间,然后sleep
    //sleepTime = serverStartTime + (tick+1)*tickDuration - time.now()
    waitForNextTick()
    // 从timeouts队列中不断取出队列元素,计算下标后写入wheels数组
    transferTimeoutsToWheels()
    // 计算当前tick下标,从wheels数组中取出需要执行的元素并删除,然后交付给处理线程执行
    readWheelsAndExcute()
    // 轮次计数自增
    tick++

回顾QMQ的实现方案,其设计妙核要点在于两层时间轮的设计:

  • 将WAL按执行时间的小时纬度分桶,作为第一层时间轮,写入成功后删除WAL。延迟消息单独存储,这样解决了长延迟导致的offset无法提交和磁盘占用问题。
  • 内存中第二层时间轮只加载offset和excute time等关键信息,并且通过时间轮算法省去了延迟消息的排序,插入删除操作都是O(1)的时间复杂度。
  • 延时加载,内存中只会有最近要消费的消息,更久的延时消息会被存储在磁盘中,对内存友好,并支持超大跨度的延迟消息。

缺点:

  • Delay server除了对Schedule log轮询扫描写入timeouts队列外,还需要while循环进行时间轮逻辑,CPU空转成本高。
  • 此外还有一些关于架构相关的劣势:比如为了主从log之间的强一致性导致可用性下降,delay server有状态导致无法弹性扩缩容等等,本次分享只针对延迟消息整体逻辑上的实现,携程针对QMQ开源版本的这些缺点做了优化,有兴趣的同学可以自行了解。

总结

延迟队列公司MQ依赖存储
RocketMQ阿里(免费版)RocketMQ和普通消息一致
DDMQ滴滴RocketMQRocksDB
QMQ去哪儿自研MQ文件系统
lmstfy美图Redis

从前述各个队列的实现,我们不难得出延迟队列的核心是Delay server模块,它真正实现了:

  • 从Producer接受/消费延迟消息,并对延迟消息进行解析、暂存和排序,实现存储与普通消息的隔离。
  • 对延迟消息进行轮询,筛选到期的延迟消息转变为普通消息,使得消费者可正常消费。

如文章开头所述,这中间设计的逻辑难点主要有两个:消息排序持久化

Delay server解决这两个问题的方式主要有两个:

  • 基于外部存储。引入RocksDB这类大量写范围查性能好的外部存储,实现对延迟消息的持久化。后台常驻线程定时范围查询出到期的延迟消息进行转发。
  • 基于时间轮。将延迟消息到期时间按时间分桶拆分成小文件,通过数组+链表方式提前load到内存中,将消息根据到期时间放进对应下标所存的链表中。游标每固定时间前进一个单位,遍历当前数组下标对应的链表取出到期消息,进行转发。

以上是关于如何实现延迟消息的主要内容,如果未能解决你的问题,请参考以下文章

[译] Golang实时垃圾回收理论和实践

实践:RabbitMQ 延迟队列,消息延迟推送的实现

图文结合!Redis延迟队列golang高效实践

golang使用Nsq

golang中间件简单实现

10 SpringBoot整合RocketMQ实现延迟消息