RocketMQ使用顺序消息
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用顺序消息相关的知识,希望对你有一定的参考价值。
说明
RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。
核心思想是:发送消息时,可以通过实现MessageQueueSelector接口,选择消息发送到哪个队列,从而保证某类数据的顺序性。同时,可以在send方法中指定入参,方便MessageQueueSelector接口内部根据入参选择指定的队列。
生产端
@Test
public void sendMessageOrderly() throws Exception
DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
int arg = 3;
for (int i = 0; i < 3; i++)
Message message = new Message(RocketMqUtil.TOPIC, "orderly", "顺序消息".getBytes(Charset.forName("UTF-8")));
//为了保证消息顺序,则消息发送到同一个队列中,可通过MessageQueueSelector实现
// 可以通过arg参数在内部协助计算发送到哪个队列
defaultMQProducer.send(message, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
//根据参数选定消息发送到哪个队列,确保同类消息在同一队列中,以确保消息是按顺序存放
int index = ((int) arg) % mqs.size();
log.info("队列数:,当前队列数:", mqs.size(), index);
return mqs.get(index);
, arg);
defaultMQProducer.shutdown();
运行效果:
从上图可以看到,消息每次都是发送到同一个队列(编号为3的队列)。
RocketMQ上的消息:
消费端
@Test
public void consumeMessageOrderly () throws Exception
DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
//消费监听器指定顺序消息监听器
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly()
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context)
log.info("消费到消息条数:", msgs.size());
msgs.stream().map(messageExt -> new String(messageExt.getBody(), Charset.forName("UTF-8")))
.map(String::new).forEach(System.out::println);
return ConsumeOrderlyStatus.SUCCESS;
);
defaultMQPushConsumer.start();
Thread.sleep(5000L);
defaultMQPushConsumer.shutdown();
消费端代码与普通消息消费唯一的不同是消息监听器要使用MessageListenerOrderly接口类消息监听器。
消费结果:
总结
1、生产端,通过实现MessageQueueSelector接口,选择某一类消息发送到同一个queue,由于同一个queue是有序的,所以消息也是有序的。
2、消费端,使用MessageListenerOrderly作为消息的监听器。
以上是关于RocketMQ使用顺序消息的主要内容,如果未能解决你的问题,请参考以下文章