RocketMQ——Order Message(顺序消息)
Posted 1013wang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ——Order Message(顺序消息)相关的知识,希望对你有一定的参考价值。
生产者端消费者端运行效果补充
RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调用传统的send方法,消息是无序的。接下来,看看顺序消费。
模拟这样一个场景,如果一个用户完成一个订单需要3条消息,比如订单的创建、订单的支付、订单的发货,很显然,同一个用户的订单消息必须要顺序消费,但是不同用户之间的订单可以并行消费。
生产者端
看一下生产者端的代码:
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer"); producer.setNamesrvAddr("192.168.99.9876"); producer.start(); String[] tags = new String[]{"createTag", "payTag", "sendTag"}; for (int orderId = 1; orderId <= 10; orderId++) { //订单消息 for (int type = 0; type < 3; type++) { //每种订单分为:创建订单/支付订单/发货订单 Message msg = new Message("OrderTopic", tag[type % tag.length], orderId + ":" + type, (orderId + ":" + type).getBytes() ); SendResult sendResult = producer.send(msg, new MessageQueueSelector(){ @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){ Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.println(sendResult); } }
注意:一个Message除了Topic/Tag外,还有Key的概念。
上图的send方法不同于以往,有一个MessageQueueSelector,将用于指定特定的消息发往特定的队列当中!
消费者端
看一下消费者端的代码:
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { try { //模拟业务处理消息的时间 Thread.sleep(new Random().nextInt(1000)); System.out.println(new String(msgs.getBody(),"utf-8")); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } });
注意:在以前普通消费消息时设置的回调是MessageListenerConcurrently,而顺序消费的回调设置是MessageListenerOrderly。
运行效果
当我们启动2个Consumer进行消费时,可以观察到:
多个消费者消费的结果
可以观察得到,虽然从全局上来看,消息的消费不是有序的,但是每一个订单下的3条消息是顺序消费的!
其实,如果需要保证消息的顺序消费,那么很简单,首先需要做到一组需要有序消费的消息发往同一个broker的同一个队列上!其次消费者端采用有序Listener即可。
补充
这里,RocketMQ底层是如何做到消息顺序消费的,看一看源码你就能大概了解到,至少来说,在多线程消费场景下,一个线程只去消费一个队列上的消息,那么自然就保证了消息消费的顺序性,同时也保证了多个线程之间的并发性。也就是说其实broker并不能完全保证消息的顺序消费,它仅仅能保证的消息的顺序发送而已!
关于多线程消费这块,RocketMQ早就替我们想好了,这样设置即可:
想一想,在ActiveMQ中,我们如果想实现并发消费的话,恐怕还得搞个线程池提交任务吧,RocketMQ让我们的工作变得简单!
我这儿整理了比较全面的JAVA相关的面试资料,
需要领取面试资料的同学,请加群:473984645
以上是关于RocketMQ——Order Message(顺序消息)的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ——Transaction Message(事务消息)
分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
RocketMQ入门到精通— RocketMQ初级特性能力 | Message Reliablity,消息可靠性(不能多也不能丢)如何解决?