详解RocketMQ 顺序消费机制
Posted 华为云开发者社区
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解RocketMQ 顺序消费机制相关的知识,希望对你有一定的参考价值。
摘要:顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
本文分享自华为云社区《RocketMQ 顺序消费机制》,作者: 勇哥java实战分享 。
顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
1、分区顺序消息
对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
- 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例:电商的订单创建,以订单 ID 作为 Sharding Key ,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
2、全局顺序消息
对于指定的一个 Topic ,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。
因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
消息的顺序需要由两个阶段保证:
- 消息发送
如上图所示,A1、B1、A2、A3、B2、B3 是订单 A 和订单 B 的消息产生的顺序,业务上要求同一订单的消息保持顺序,例如订单 A 的消息发送和消费都按照 A1、A2、A3 的顺序。
如果是普通消息,订单A 的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时 RocketMQ 支持将 Sharding Key 相同(例如同一订单号)的消息序路由到一个队列中。
- 消息消费
消费者消费消息时,需要保证消息消费顺序和存储顺序一致,最终实现消费顺序和发布顺序的一致。
我们知道负载均衡服务是客户端开始消费的起点。在负载均衡阶段,并发消费和顺序消费并没有什么大的差别,最大的差别在于:向 Borker 申请锁 。
消费者根据分配的队列 messageQueue ,向 Borker 申请锁 ,如果申请成功,则会拉取消息,如果失败,则定时任务每隔20秒会重新尝试。
见上图,顺序消费核心流程如下:
1、 组装成消费对象
2、 将请求对象提交到消费线程池
和并发消费不同的是,这里的消费请求包含消费快照 processQueue ,消息队列 messageQueue 两个对象,并不对消息列表做任何处理。
3、 消费线程内,对消费队列加锁
4、 从消费快照中取得待消费的消息列表
消费快照 processQueue 对象里,创建了一个红黑树对象 consumingMsgOrderlyTreeMap 用于临时存储的待消费的消息。
5、 执行消息监听器
执行监听器逻辑容易理解,消费快照的消费锁 consumeLock的作用是:防止 Rebalance 线程把当前消费的 MessageQueue 对象移除掉。
6、 处理消费结果
消费成功时,首先计算需要提交的偏移量,然后更新本地消费进度。
消费失败时,分两种场景:
- 假如已消费次数小于最大重试次数,则将放入对象 consumingMsgOrderlyTreeMap 用例临时存储的待消费的消息,重新加入到消费快照红黑树msgTreeMap中,然后使用定时任务尝试重新消费。
- 假如已消费次数大于等于最大重试次数,则将失败消息发送到 Broker ,Broker 接收到消息后,会加入到死信队列里 , 最后计算需要提交的偏移量,然后更新本地消费进度。
我们做一个关于顺序消费的总结:
- 顺序消费需要由两个阶段消息发送和消息消费协同配合,底层支撑依靠的是 RocketMQ 的存储模型;
- 顺序消费服务启动后,通过三把锁的机制,消息队列 messageQueue 的数据都会被消费者实例单线程的执行消费;
- 假如消费者扩容,消费者重启,或者 Broker 宕机 ,顺序消费也会有一定几率较短时间内乱序,所以消费者的业务逻辑还是要保障幂等。
RocketMQ 消息详解
MQ
MQ 架构:
Message 包含内容:
RocketMQ 客户端坐标:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
消息发送&消费模式
注意:当生产者出现找不到 topic 的报错信息时,要检查是否服务器的防火墙策略导致的。
One-To-One(单生产者单消费者)
生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer
public static void main(String[] args) throws Exception
// 1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1"); // 入参为自定义组名
// 2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.3.244:9876");
// 设置发送消息超时时间(默认为3000)
producer.setSendMsgTimeout(60000);
// 3.启动发送的服务
producer.start();
// 4.创建要发送的消息对象,指定topic和body
Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
// 5.发送单条消息
SendResult result = producer.send(msg);
// 打印返回消息
System.out.println("返回结果:"+result);
// 6.关闭连接
producer.shutdown();
运行结果:
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E658458644D464D61A1F80000, offsetMsgId=C0A803DE00002A9F0000000000000000, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=0]
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer
public static void main(String[] args) throws Exception
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.244:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)
// 遍历消息
for(MessageExt msg : list)
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
运行结果:
接收消息服务已开启运行
消息:hello rocketmq
One-To-Many(单生产者多消费者)
生产者
package onetoone;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer
public static void main(String[] args) throws Exception
// 1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1"); // 入参为自定义组名
// 2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.3.222:9876");
// 设置发送消息超时时间(默认为3000)
producer.setSendMsgTimeout(60000);
// 3.启动发送的服务
producer.start();
// 创建要发送的消息对象,指定topic和body
// Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
// 发送单条消息
// SendResult result = producer.send(msg);
// 4. 发送多条消息
for (int i = 1; i <= 10; i++)
Message msg = new Message("topic1", ("生产者:hello rocketmq "+i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("返回结果:" + result);
// 5.关闭连接
producer.shutdown();
运行结果:
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF300000, offsetMsgId=C0A803DE00002A9F0000000000000E79, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=4]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF7A0001, offsetMsgId=C0A803DE00002A9F0000000000000F2A, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EF980002, offsetMsgId=C0A803DE00002A9F0000000000000FDB, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFAB0003, offsetMsgId=C0A803DE00002A9F000000000000108C, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=5]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFB70004, offsetMsgId=C0A803DE00002A9F000000000000113D, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=5]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFCC0005, offsetMsgId=C0A803DE00002A9F00000000000011EE, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=7]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFDF0006, offsetMsgId=C0A803DE00002A9F000000000000129F, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=0], queueOffset=7]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFEB0007, offsetMsgId=C0A803DE00002A9F0000000000001350, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=1], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFF20008, offsetMsgId=C0A803DE00002A9F0000000000001401, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=6]
返回结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E7DB058644D464D70EFFD0009, offsetMsgId=C0A803DE00002A9F00000000000014B2, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=3], queueOffset=8]
消费者
负载均衡模式
负载均衡模式:即生产的多个消息(数量)会均衡分配给各个消费者。
package onetoone;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer
public static void main(String[] args) throws Exception
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.222:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 设置当前消费者的消费模式:负载均衡(也是默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)
// 遍历消息
for(MessageExt msg : list)
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
运行结果:
广播模式
广播模式:即生产出来的同一条消息,会被每个消费者所接收。
广播模式的现象:
- 如果先生产,后消费,则消息只能被消费一次。
- 如果多个消费者先启动(广播模式),后生产,此时才有广播的效果。
- 结论:必须先启动消费者,再启动生产者,才有广播的效果。
package onetoone;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer
public static void main(String[] args) throws Exception
// 1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); // 入参为自定义组名
// 2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.3.222:9876");
// 3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
// 设置当前消费者的消费模式为广播模式:所有客户端接收的消息都是一样的
consumer.setMessageModel(MessageModel.BROADCASTING);
// 4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)
// 遍历消息
for(MessageExt msg : list)
// System.out.println("收到消息:"+msg);
System.out.println("消息:" + new String(msg.getBody()));
// 成功处理后,mq 收到该标记,则相同的消息将不会再次发给消费者
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 5.启动接收消息的服务
consumer.start(); // 开启多线程监控消息,会持续运行
System.out.println("接收消息服务已开启运行");
运行结果:
Many-To-Many(多生产者多消费者)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费。
(演示:将上述示例的生产者启动多个实例即可)。
消息类别
同步消息
特征:即时性较强,通常是重要的且必须有回执的消息,例如短信、通知(转账成功)。
代码实现:
SendResult result = producer.send(msg);
异步消息
特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息。
代码实现:
// 注意:回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行
producer.send(msg, new SendCallback()
// 表示成功返回结果
public void onSuccess(SendResult sendResult)
System.out.println(sendResult);
// 表示发送消息失败
public void onException(Throwable t)
System.out.println(t);
);
// 注意后面不要马上shutdown(关闭)连接,确保留有时间接收到异步返回的消息
单向消息
特征:不需要有回执的消息,例如日志类的消息。
代码实现:
producer.sendOneway(msg);
延时消息
延时消息:指消息发送时并不直接发送到消息服务器,而是根据设定的等待时间才到达,起到延时到达的缓冲作用。
Message msg = new Message("topic3", ("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
// 设置当前消息的延时效果
msg.setDelayTimeLevel(3); // 入参表示下标3,代表30s(参考下表)
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);
目前支持的消息时间:
- 秒级:1、5、10、30
- 分级:1~10、20、30
- 时级:1、2
- 入参下标含义:1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
批量消息
// 创建消息集合创建要发送的消息对象,指定topic和body
ArrayList<Message> messageList = new ArrayList<>();
Message msg1 = new Message("topic1", "hello rocketmq 1".getBytes("UTF-8"));
Message msg2 = new Message("topic1", "hello rocketmq 2".getBytes("UTF-8"));
Message msg3 = new Message("topic1", "hello rocketmq 3".getBytes("UTF-8"));
messageList.add(msg1);
messageList.add(msg2);
messageList.add(msg3);
// (一次)发送批量消息
SendResult result = producer.send(messageList);
System.out.println("发送结果:"+result);
运行结果:
发送结果:SendResult [sendStatus=SEND_OK, msgId=A9FE135E681458644D4650530BBF0000,A9FE135E681458644D4650530BBF0001,A9FE135E681458644D4650530BBF0002, offsetMsgId=C0A803DE00002A9F0000000000001E3E,C0A803DE00002A9F0000000000001EE3,C0A803DE00002A9F0000000000001F88, messageQueue=MessageQueue [topic=topic1, brokerName=broker-a, queueId=2], queueOffset=13]
消息过滤
分类过滤
- 生产者:发送时指定 tag
Message msg = new Message("topic6", "tag2", ("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
- 消费者:消费指定 tag 的消息
// 接收消息时,除了可以指定topic,还可以指定接收的tag(*代表任意tag,||代表或)
consumer.subscribe("topic6", "tag1 || tag2");
属性过滤(SQL 过滤)
生产者:
// 为消息添加属性
msg.putUserProperty("vip", "1");
msg.putUserProperty("age", "20");
消费者:
// 使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic1", MessageSelector.bySql("age >= 18"));
配置:
- 注意:SQL 过滤需要依赖服务器的功能支持,在 broker.conf 配置文件中添加对应的功能项,并开启对应功能。
enablePropertyFilter=true
- 启动服务器使启用对应配置文件:
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
消息顺序
消息乱序
-
默认情况下,MQ 开启了多个队列, 同时发送多个消息的的话,发送给哪个队列是不确定的。同时消费者读取消息,每读取一个消息开启一个线程,也不能保证消息的顺序性。
-
想要保证消息的有序性,需要指定消息发送时的队列。同时消费者应该一个队列开启一个线程进行接收,而不是一个消息一个线程。
顺序消息
生产者:
// 设置消息进入到指定的消息队列中
for(final Order order : orderList)
Message msg = new Message("orderTopic", order.toString().getBytes());
// 发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector()
// 设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o)
// 根据发送的信息不同,选择不同的消息队列
// 根据id来选择一个消息队列的对象(用id的哈希值来取模)
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
, null);
System.out.println(result);
消费者:
// 使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly()
// 使用MessageListenerOrderly接口后,
// 对消息队列的处理由一个消息队列多个线程服务转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext)
for(MessageExt msg : list)
System.out.println("消息:"+new String(msg.getBody()));
return ConsumeOrderlyStatus.SUCCESS;
);
事务消息
事务消息过程
事务消息状态
-
提交状态:允许进入队列,此消息与非事务消息无区别。
-
回滚状态:不允许进入队列,此消息等同于未发送过。
-
中间状态:完成了half消息的发送,未对 MQ 进行二次状态确认。
注意:事务消息仅与生产者有关,与消费者无关。
事务消息实现
// 事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
// 添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener()
// 正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o)
// 提交状态
return LocalTransactionState.COMMIT_MESSAGE;
// 回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
// 中间状态
return LocalTransactionState.UNKNOW;
// 事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt)
// 事务补偿过程必须保证服务器正在运行,否则将无法进行正常的事务补偿
return LocalTransactionState.COMMIT_MESSAGE;
// return null;
);
producer.start();
Message msg = new Message("topic8", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg,null);
System.out.println("返回结果:"+result);
producer.shutdown();
以上是关于详解RocketMQ 顺序消费机制的主要内容,如果未能解决你的问题,请参考以下文章