rocketmq实现延时队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq实现延时队列相关的知识,希望对你有一定的参考价值。

参考技术A 说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时

具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早

流程图

源码分析:

如果想要深入了解的可以看一下ScheduleMessageService这个类

delayLevelTable定义了延迟级别和延迟时间的对应关系,offsetTable存放延延迟级别对应的队列消费的offset

使用timer定时器启动了一个定时任务,把每个扫描队列封装成一个任务,然后加入到timer中

每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除

总结

优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性

缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况

改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率

RocketMQ延时消息实现原理探究

由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。
其中

  • rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现
    • 缺点嘛:每次都得设置两个队列,一个用来实现延时,过期后经死信exchange转到对应的业务队列提供消费。
    • 另:rabbitmq有提供延时插件,但缺点较多,如:1. 启动插件要么重启,要么引入一个新的集群;2. 不支持高可用,延时消息发送前只存储在当前broker节点的内部数据库Mnesia中,不会被镜像复制;3. 延时不可靠,存在消息数量较大或使用很久后延迟不准确;4:不支持大规模消息,同3;5:只支持ram节点(Mnesia数据库存在磁盘中)
  • kafka 延时消息通过在消费端判断消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
    • 缺点:针对单个topic固定的延时时间。 需要额外在消费端进行开发 (实际上这种在消费端控制延时的方式大部分消息队列都能做到)

无论是rabbitmq的死信还是kafka消费端控制,基本上都是每个topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:

  • 考试结束后强制交卷。不同的考试规定的时间是不一样的,不可能每个考试都创建一个新的队列。
  • 一些异常的重试操作。执行某个操作失败后,需要多次不同等级的延时重试。(虽说这个用一个本地线程也可以,但是在同一台机器上延时重试,仍然存在较大可能失败。所以比较关键场景可以使用延时消息,分发到其他机器上执行。)

于是调研了一下其他的消息中间件,发现rocketmq貌似是支持延时消息的,虽然也只有固定的18个延时等级,但相比rabbitmq和kafka固定的延时消息,要好很多了。于是开始学习探究rocketmq延时消息的实现。

正文开始

先了解一下rocketmq

rocketmq 的架构

整个架构如图,简单描述一下:

  • nameServer 提供注册中心的服务,负责broker的管理,以及topic路由信息的管理。
  • brokerServer 则主要负责消息的存储、投递和查询及高可用。
  • Producer 连接nameServer获取到broker信息后,发送信息到对应的broker。
  • Consumer 同样先连接 nameServer,查询topic路由信息,然后连接broker消费消息。

消息的存储


如图,rocketmq 的所有消息都存储在 commitlog 中,然后ConsumerQueue作为逻辑消费队列,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中 ConsumeQueue的存储单元为 8字节的offset+4字节的size+8字节的tags hashcode, 对于延时消息,最后8字节则用于存储消息计划投递时间。

然后关于rocketmq的延时消息的使用

rocketmq 只支持固定18个等级的延时消息:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

发送延时消息只需要 setDelayTimeLevel 就可以,(连个延时等级相关的常量都没有。。。)

    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
     // Launch producer
     producer.start();
     int totalMessagesToSend = 100;
     for (int i = 0; i < totalMessagesToSend; i++) {
         Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
         // This message will be delivered to consumer 10 seconds later.
         message.setDelayTimeLevel(3);
         // Send the message
         producer.send(message);
     }

     // Shutdown producer after use.
     producer.shutdown();

然后关于延时消息的实现

先看下流程图

然后文字+代码介绍

  • org.apache.rocketmq.store.CommitLog#putMessage
    通过查看putMessage源码可以得知,rocketmq在最终将message存入commitlog时,会先判断是否延时消息,如果延时消息则替换topic为SCHEDULE_TOPIC_XXXX,并将原topic存入message.properties,然后根据延时level存入指定的queue。(18个延时等级分别对应18个queue的id)。

    ...
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
    
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    ...
    
  • org.apache.rocketmq.store.schedule.ScheduleMessageService#start
    延时发送的代码,启动1s后,对已创建的delayQueue启动一个投递延时消息的任务,然后根据offset批量拉取对应的消息,判断是否到达投递时间,未到达则使用timer延时对应时长后启动下一次投递任务;到达投递时间则恢复原始topic和queueid并调用writeMessageStore.putMessage(msgInner)将消息再次投递到commitlog中,然后投递下一个消息。

    public void start() {
        if (started.compareAndSet(false, true)) {
            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }
                // 1s 后启动一个投递延时消息的任务
                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }
    
            // 默认每隔10s持久化一次 delayConsumeQueue的offset信息, 一个delayOfset.json文件
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    }
    
  • org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup 延时任务处理逻辑
    先获取当前延时等级的ConsumeQueue逻辑消费队列。然后根据传入的offset获取消息offset,tagsCode(延时消息存的是计划投递时间)。然后判断消息是否到期,到期则根据offset从commitLog中取出消息内容,并将其投递到原始topic中;如果未到期则再次在timer中添加一个延时任务(延时时间为计划投递的时间)。然后继续处理下一条记录。

    public void executeOnTimeup() {
            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
    
            long failScheduleOffset = offset;
    
            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) {
                    try {
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong(); //消息在commitLog中的offset
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong(); // 计划投递时间
    
                            //省略一些代码....
    
                            //判断消息是否到期
                            long countdown = deliverTimestamp - now;
                            if (countdown <= 0) {
                                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                                if (msgExt != null) {
                                    try {
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);
    
                                        ....
                                        
                                    } catch (Exception e) {
                                        .....
                                    }
                                }
                            } else {
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for
    
                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally {
    
                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else {
                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) {
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)
    
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }
    

总结

rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。

另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。

虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。

另外:后面又看到了去哪了开源的qmq,貌似支持任意延迟时间,感觉也可以学习一波。

以上是关于rocketmq实现延时队列的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ延时消息实现原理探究

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

如果有人再问你怎么实现分布式延时消息,这篇文章丢给他

RocketMQ实现延迟队列精确到秒级实现