RocketMQ使用广播消息

Posted 乐观男孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ使用广播消息相关的知识,希望对你有一定的参考价值。

说明

RocketMQ消息模式主要有两种:
(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有消息。
(2)、MessageModel.BROADCASTING:广播模式。同一消费者组内的每个消费者,都消费到Topic的所有消息。如Topic有100条消息,则同个消费者组下的所有消费者都能消费到100条消息。

消息广播,主要配置在于消费者通过配置消息模式MessageModel)为MessageModel.BROADCASTING实现。

生产端

@Test
public void sendMessage() throws Exception 
    DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
    Message message = new Message(RocketMqUtil.TOPIC, "broadcasting",
            "broadcasting-message".getBytes(Charset.forName("UTF-8")));
    //按正常操作发送消息
    SendResult sendResult = defaultMQProducer.send(message);
    log.info("发送消息结果:", sendResult.getSendStatus().name());

生产端按正常发送逻辑发送消息即可。

消费端

@Test
public void consumer() throws Exception 
    DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
    //设置消费消息为广播模式
    defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
    defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
            msgs.stream().map(MessageExt::getBody).map(String::new).forEach(body -> log.info("消息内容:", body));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        
    );
    defaultMQPushConsumer.start();
    Thread.sleep(5000L);
    defaultMQPushConsumer.shutdown();

消费端,需要设置Consumer对象消费消息的消息模式为MessageModel.BROADCASTING。

总结

消息广播,主要在消费端通过对Consumer对象的消息模式(MessageModel)属性设置为MessageModel.BROADCASTING。

以上是关于RocketMQ使用广播消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ系列广播与延迟消息

rocketmq广播消息

RocketMQ集群消息与广播消费

RocketMQ(05)——消息的群集消费和广播消费

rocketmq 以广播方式实现消费者消费消息

RocketMQ-广播模式消费