rocketmq广播消息
Posted peachyy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq广播消息相关的知识,希望对你有一定的参考价值。
发布与模式实现。广播就是向一个主题的所有订阅者发送同一条消息。
在发送消息的时候和普通的消息并与不同之处,只是在消费端做一些配置即可。
Consumer消息消费
public class BroadcastConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置为广播模式 默认为CLUSTERING模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
以上是关于rocketmq广播消息的主要内容,如果未能解决你的问题,请参考以下文章