RocketMQ入门到精通— RocketMQ初级特性能力 | Message Order,RocketMQ的消息可以是有序的哦
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ入门到精通— RocketMQ初级特性能力 | Message Order,RocketMQ的消息可以是有序的哦相关的知识,希望对你有一定的参考价值。
名言警句
任何先进的技术均与魔法无异
追本溯源
经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】
订阅与发布
消息的发布是指某个生产者向某个topic发送消息,消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。
消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序 对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
- 适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
- 分区顺序 对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
- 适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
顺序消息发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。
RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。
顺序消息分类
生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。
例如,创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
顺序消息的一致性
如果一个Broker掉线,那么此时队列总数是否会发化?
如果发生变化,那么同一个ShardingKey的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(--order)为true,表示顺序消息。
$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
MessageQueueSelector的接口:
public interface MessageQueueSelector
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
其中mqs是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。
生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。
保证NameServer中的配置 orderMessageEnable
和 returnOrderTopicConfigToBroker
必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。
顺序消息示例代码
public class Producer
public static void main(String[] args) throws UnsupportedEncodingException
try
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] "TagA", "TagB", "TagC", "TagD", "TagE";
for (int i = 0; i < 100; i++)
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
, orderId);
System.out.printf("%s%n", sendResult);
producer.shutdown();
catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e)
e.printStackTrace();
这里的区别主要是调用了SendResult send(Message msg, MessageQueueSelector selector, Object arg)
方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。
以上是关于RocketMQ入门到精通— RocketMQ初级特性能力 | Message Order,RocketMQ的消息可以是有序的哦的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ入门到精通— RocketMQ初级特性能力 | Message Filter消息过滤,可不是web容器的过滤器哦
RocketMQ入门到精通— RocketMQ初级特性能力 | Message Persistence,消息中间件通常采用的几种持久化方式,可以存到数据库里面甚至redis里,你知道不?
RocketMQ入门到精通— RocketMQ学习入门指南 | 精讲RocketMQ是什么
RocketMQ入门到精通— RocketMQ学习入门指南 | RocketMQ服务发现(Name Server)精讲
RocketMQ入门到精通— RocketMQ中级特性能力 | 解释一下顺序消息原理
(作者推荐)RocketMQ入门到精通— RocketMQ中级特性能力 | 长轮询Pull和Push模式你选择哪个?为什么?区别在哪里?(含源码分析)「Push篇」