RocketMQ延时消息实现原理探究
Posted 不如敲代码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ延时消息实现原理探究相关的知识,希望对你有一定的参考价值。
由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。
其中
- rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现
- 缺点嘛:每次都得设置两个队列,一个用来实现延时,过期后经死信exchange转到对应的业务队列提供消费。
- 另:rabbitmq有提供延时插件,但缺点较多,如:1. 启动插件要么重启,要么引入一个新的集群;2. 不支持高可用,延时消息发送前只存储在当前broker节点的内部数据库Mnesia中,不会被镜像复制;3. 延时不可靠,存在消息数量较大或使用很久后延迟不准确;4:不支持大规模消息,同3;5:只支持ram节点(Mnesia数据库存在磁盘中)
- kafka 延时消息通过在消费端判断消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
- 缺点:针对单个topic固定的延时时间。 需要额外在消费端进行开发 (实际上这种在消费端控制延时的方式大部分消息队列都能做到)
无论是rabbitmq的死信还是kafka消费端控制,基本上都是每个topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:
- 考试结束后强制交卷。不同的考试规定的时间是不一样的,不可能每个考试都创建一个新的队列。
- 一些异常的重试操作。执行某个操作失败后,需要多次不同等级的延时重试。(虽说这个用一个本地线程也可以,但是在同一台机器上延时重试,仍然存在较大可能失败。所以比较关键场景可以使用延时消息,分发到其他机器上执行。)
于是调研了一下其他的消息中间件,发现rocketmq貌似是支持延时消息的,虽然也只有固定的18个延时等级,但相比rabbitmq和kafka固定的延时消息,要好很多了。于是开始学习探究rocketmq延时消息的实现。
正文开始
先了解一下rocketmq
rocketmq 的架构
整个架构如图,简单描述一下:
- nameServer 提供注册中心的服务,负责broker的管理,以及topic路由信息的管理。
- brokerServer 则主要负责消息的存储、投递和查询及高可用。
- Producer 连接nameServer获取到broker信息后,发送信息到对应的broker。
- Consumer 同样先连接 nameServer,查询topic路由信息,然后连接broker消费消息。
消息的存储
如图,rocketmq 的所有消息都存储在 commitlog 中,然后ConsumerQueue作为逻辑消费队列,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中 ConsumeQueue的存储单元为 8字节的offset+4字节的size+8字节的tags hashcode, 对于延时消息,最后8字节则用于存储消息计划投递时间。
然后关于rocketmq的延时消息的使用
rocketmq 只支持固定18个等级的延时消息:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
发送延时消息只需要 setDelayTimeLevel 就可以,(连个延时等级相关的常量都没有。。。)
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
然后关于延时消息的实现
先看下流程图
然后文字+代码介绍
-
org.apache.rocketmq.store.CommitLog#putMessage
通过查看putMessage
源码可以得知,rocketmq在最终将message存入commitlog时,会先判断是否延时消息,如果延时消息则替换topic为SCHEDULE_TOPIC_XXXX
,并将原topic存入message.properties,然后根据延时level存入指定的queue。(18个延时等级分别对应18个queue的id)。... // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } ...
-
org.apache.rocketmq.store.schedule.ScheduleMessageService#start
延时发送的代码,启动1s后,对已创建的delayQueue启动一个投递延时消息的任务,然后根据offset批量拉取对应的消息,判断是否到达投递时间,未到达则使用timer延时对应时长后启动下一次投递任务;到达投递时间则恢复原始topic和queueid并调用writeMessageStore.putMessage(msgInner)
将消息再次投递到commitlog中,然后投递下一个消息。public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } // 1s 后启动一个投递延时消息的任务 if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 默认每隔10s持久化一次 delayConsumeQueue的offset信息, 一个delayOfset.json文件 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
-
org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup
延时任务处理逻辑
先获取当前延时等级的ConsumeQueue逻辑消费队列。然后根据传入的offset获取消息offset,tagsCode(延时消息存的是计划投递时间)。然后判断消息是否到期,到期则根据offset从commitLog中取出消息内容,并将其投递到原始topic中;如果未到期则再次在timer中添加一个延时任务(延时时间为计划投递的时间)。然后继续处理下一条记录。public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); //消息在commitLog中的offset int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); // 计划投递时间 //省略一些代码.... //判断消息是否到期 long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); .... } catch (Exception e) { ..... } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
总结
rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。
通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。
另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。
虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。
另外:后面又看到了去哪了开源的qmq,貌似支持任意延迟时间,感觉也可以学习一波。
以上是关于RocketMQ延时消息实现原理探究的主要内容,如果未能解决你的问题,请参考以下文章
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息