RocketMQ 定时消息
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 定时消息相关的知识,希望对你有一定的参考价值。
参考技术A 消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。比如:常见的场景电商交易中超时未支付关闭订单的场景,通过延时消息在30分钟后投递给消息端进行关单。
开源 RocketMQ 针对目前只支持固定精度的定时消息。生产端发送消息,通过设delayTimeLevel时间级别后,可实现消息不立马被消费者消费到,而是按照18个级别 ("1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";)。比如支持5秒、10秒的Level,那么用户只能发送5秒延迟或者10秒延迟,不能发送8秒延迟的消息。
优点: 设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
缺点: 定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
五.RocketMQ极简入门-RocketMQ延迟消息
使用场景
我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。
概述
延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加如:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d
增加了2d 两天,这个时候总共就有19个level。
延迟消息工作原理
延迟队列工作流程图
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走延迟队列,执行下面的流程
-
修改消息Topic的名字为SCHEDULE_TOPIC_XXXX
-
根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId
目录与consumequeue文件 -
修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 。下面是CosumeQueue单个存储单元组成结构如下
- Commit Log Offset:记录在CommitLog中的位置。
- Size:记录消息的大小
- Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。
-
将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中
-
Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
-
在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。
延迟消息实战
消息发送方
给消息设置延迟级别,API:message.setDelayTimeLevel(3);
public class Producer {
//演示消息同步发送
public static void main(String[] args) throws Exception {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("syn-producerGroup-delay");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++){
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
//创建消息,指定:TOPIC 和 TAG
Message message = new Message("topic-order-delay","product-order-delay",bytes);
//延迟级别 3,代表 10s延迟
message.setDelayTimeLevel(3);
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message);
System.out.println("发送时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println(result);
}
producer.shutdown();
}
}
消息消费方
public class Consumer {
public static void main(String[] args) throws Exception {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer =
new DefaultMQPushConsumer("syn-consumerGroup-delay");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅
defaultMQPushConsumer.subscribe("topic-order-delay","product-order-delay");
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(message->{
System.out.println("消费时间:"+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
}
从消费者消费结果时间来看,消息是延迟了10s后才收到。
文章结束,创作不易,大佬给个好评。
以上是关于RocketMQ 定时消息的主要内容,如果未能解决你的问题,请参考以下文章