RocketMQ使用广播消息
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用广播消息相关的知识,希望对你有一定的参考价值。
说明
RocketMQ消息模式主要有两种:
(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有消息。
(2)、MessageModel.BROADCASTING:广播模式。同一消费者组内的每个消费者,都消费到Topic的所有消息。如Topic有100条消息,则同个消费者组下的所有消费者都能消费到100条消息。
消息广播,主要配置在于消费者通过配置消息模式MessageModel)为MessageModel.BROADCASTING实现。
生产端
@Test
public void sendMessage() throws Exception
DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
Message message = new Message(RocketMqUtil.TOPIC, "broadcasting",
"broadcasting-message".getBytes(Charset.forName("UTF-8")));
//按正常操作发送消息
SendResult sendResult = defaultMQProducer.send(message);
log.info("发送消息结果:", sendResult.getSendStatus().name());
生产端按正常发送逻辑发送消息即可。
消费端
@Test
public void consumer() throws Exception
DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
//设置消费消息为广播模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
msgs.stream().map(MessageExt::getBody).map(String::new).forEach(body -> log.info("消息内容:", body));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
defaultMQPushConsumer.start();
Thread.sleep(5000L);
defaultMQPushConsumer.shutdown();
消费端,需要设置Consumer对象消费消息的消息模式为MessageModel.BROADCASTING。
总结
消息广播,主要在消费端通过对Consumer对象的消息模式(MessageModel)属性设置为MessageModel.BROADCASTING。
以上是关于RocketMQ使用广播消息的主要内容,如果未能解决你的问题,请参考以下文章