RocketMQ——Order Message(顺序消息)

Posted 1013wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ——Order Message(顺序消息)相关的知识,希望对你有一定的参考价值。

生产者端消费者端运行效果补充
RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调用传统的send方法,消息是无序的。接下来,看看顺序消费。
模拟这样一个场景,如果一个用户完成一个订单需要3条消息,比如订单的创建、订单的支付、订单的发货,很显然,同一个用户的订单消息必须要顺序消费,但是不同用户之间的订单可以并行消费。

生产者端

看一下生产者端的代码:

DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("192.168.99.9876");
producer.start();

String[] tags = new String[]{"createTag", "payTag", "sendTag"};

for (int orderId = 1; orderId <= 10; orderId++) {    //订单消息
    for (int type = 0; type < 3; type++) {            //每种订单分为:创建订单/支付订单/发货订单
        Message msg = new Message("OrderTopic", 
                tag[type % tag.length], 
                orderId + ":" + type,
                (orderId + ":" + type).getBytes()
        );
        SendResult sendResult = producer.send(msg, new MessageQueueSelector(){
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg){
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);        
        System.out.println(sendResult);
    }
}
顺序消费-producer

注意:一个Message除了Topic/Tag外,还有Key的概念。
上图的send方法不同于以往,有一个MessageQueueSelector,将用于指定特定的消息发往特定的队列当中!

消费者端

看一下消费者端的代码:

consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {        
            //模拟业务处理消息的时间
            Thread.sleep(new Random().nextInt(1000));
            System.out.println(new String(msgs.getBody(),"utf-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
顺序消费-consumer

注意:在以前普通消费消息时设置的回调是MessageListenerConcurrently,而顺序消费的回调设置是MessageListenerOrderly。

运行效果

当我们启动2个Consumer进行消费时,可以观察到:

技术图片

 

 多个消费者消费的结果
可以观察得到,虽然从全局上来看,消息的消费不是有序的,但是每一个订单下的3条消息是顺序消费的!
其实,如果需要保证消息的顺序消费,那么很简单,首先需要做到一组需要有序消费的消息发往同一个broker的同一个队列上!其次消费者端采用有序Listener即可。
补充
这里,RocketMQ底层是如何做到消息顺序消费的,看一看源码你就能大概了解到,至少来说,在多线程消费场景下,一个线程只去消费一个队列上的消息,那么自然就保证了消息消费的顺序性,同时也保证了多个线程之间的并发性。也就是说其实broker并不能完全保证消息的顺序消费,它仅仅能保证的消息的顺序发送而已!
关于多线程消费这块,RocketMQ早就替我们想好了,这样设置即可:

技术图片

 

 

消费多线程设置

 

想一想,在ActiveMQ中,我们如果想实现并发消费的话,恐怕还得搞个线程池提交任务吧,RocketMQ让我们的工作变得简单!

 

我这儿整理了比较全面的JAVA相关的面试资料,

技术图片


需要领取面试资料的同学,请加群:473984645

技术图片

以上是关于RocketMQ——Order Message(顺序消息)的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 源码合集

RocketMQ 源码合集

RocketMQ——Transaction Message(事务消息)

RocketMQ 源码分析 —— Message 存储

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

RocketMQ入门到精通— RocketMQ初级特性能力 | Message Reliablity,消息可靠性(不能多也不能丢)如何解决?