RocketMQ使用顺序消息

Posted 乐观男孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用顺序消息相关的知识,希望对你有一定的参考价值。

说明

RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。
核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪个队列,从而保证某类数据的顺序性。同时,可以在send方法中指定入参,方便MessageQueueSelector接口内部根据入参选择指定的队列。

生产端

@Test
public void sendMessageOrderly() throws Exception 
    DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
    int arg = 3;
    for (int i = 0; i < 3; i++) 
        Message message = new Message(RocketMqUtil.TOPIC, "orderly", "顺序消息".getBytes(Charset.forName("UTF-8")));
        //为了保证消息顺序,则消息发送到同一个队列中,可通过MessageQueueSelector实现
        // 可以通过arg参数在内部协助计算发送到哪个队列
        defaultMQProducer.send(message, new MessageQueueSelector() 
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
                //根据参数选定消息发送到哪个队列,确保同类消息在同一队列中,以确保消息是按顺序存放
                int index = ((int) arg) % mqs.size();
                log.info("队列数:,当前队列数:", mqs.size(), index);
                return mqs.get(index);
            
        , arg);
    
    defaultMQProducer.shutdown();

运行效果:

从上图可以看到,消息每次都是发送到同一个队列(编号为3的队列)。

RocketMQ上的消息:

消费端

@Test
public void consumeMessageOrderly () throws Exception 
    DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
    //消费监听器指定顺序消息监听器
    defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() 
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                   ConsumeOrderlyContext context) 
            log.info("消费到消息条数:", msgs.size());
            msgs.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
                    .map(String::new).forEach(System.out::println);
            return ConsumeOrderlyStatus.SUCCESS;
        
    );
    defaultMQPushConsumer.start();
    Thread.sleep(5000L);
    defaultMQPushConsumer.shutdown();

消费端代码与普通消息消费唯一的不同是消息监听器要使用MessageListenerOrderly接口类消息监听器。

消费结果:

总结

1、生产端,通过实现MessageQueueSelector接口,选择某一类消息发送到同一个queue,由于同一个queue是有序的,所以消息也是有序的。
2、消费端,使用MessageListenerOrderly作为消息的监听器。

以上是关于RocketMQ使用顺序消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ(04)——发送顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ学习笔记:消息发送模式

RocketMQ使用顺序消息