Apache RocketMQ:理解官方的Broadcasting的例子
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的Broadcasting的例子相关的知识,希望对你有一定的参考价值。
1. 声明
当前内容主要用于学习和理解官方的Broadcasting,当前内容参考官方文档
翻译后:广播是向一个主题的所有订户发送信息。如果想让所有订阅者都收到关于某个主题的消息,广播是一个不错的选择。
消费者组中的所有消费者默认是均衡消费当前消费组的消息(100个消息发到消费组中2个,那么每个消费者默认消费是50个消息)
不同的消费组对于相同的订阅都是消费相同个数的(A消费组100,B消费组100),如果每个组中的消费者都只有1个,那么就类似广播
2. 消费组消费消息(订阅有相同的topic)
消费者1:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
/*
* consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQAdminImpl()
*/
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.1.102:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
AtomicInteger increment =new AtomicInteger();
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt messageExt : msgs) {
try {
String msg = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET).intern();
System.out.println("Receive New Messages: "+msg);
} catch (UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
int incrementAndGet = increment.incrementAndGet();
System.out.println("消费次数==》"+incrementAndGet);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者2:和消费者1的代码完全一样,就是订阅不同
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
生产者:与一般生产者没有任何区别!
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.1.102:9876");
producer.start();
for (int i = 0; i < 8; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
启动消费者1,消费者2,生产者(消费者完全启动后才让生产者启动)
消费者1
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost, queueId=2, storeSize=212, queueOffset=125, sysFlag=0, bornTimestamp=1635646863458, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861869, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033652, commitLogOffset=210514, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=126, KEYS=OrderID188, CONSUME_START_TIME=1635646863515, UNIQ_KEY=7F0000014BA473D16E939CB760620000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
消费次数==》1
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=212, queueOffset=125, sysFlag=0, bornTimestamp=1635646863505, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861887, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033726, commitLogOffset=210726, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=126, KEYS=OrderID188, CONSUME_START_TIME=1635646863527, UNIQ_KEY=7F0000014BA473D16E939CB760910001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
消费次数==》2
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=localhost, queueId=2, storeSize=212, queueOffset=126, sysFlag=0, bornTimestamp=1635646863569, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861948, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F00000000000339A2, commitLogOffset=211362, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=127, KEYS=OrderID188, CONSUME_START_TIME=1635646863585, UNIQ_KEY=7F0000014BA473D16E939CB760D10004, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
消费次数==》3
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=212, queueOffset=126, sysFlag=0, bornTimestamp=1635646863585, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861969, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033A76, commitLogOffset=211574, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=127, KEYS=OrderID188, CONSUME_START_TIME=1635646863600, UNIQ_KEY=7F0000014BA473D16E939CB760E10005, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]]
消费次数==》4
消费者2:
Receive New Messages: Hello world
消费次数==》1
Receive New Messages: Hello world
消费次数==》2
Receive New Messages: Hello world
消费次数==》3
Receive New Messages: Hello world
消费次数==》4
两个消费着属于同一个组默认是按照均衡方式消费所有订阅消息的(必须有相同的订阅)
3. 广播模式
设置消费者1的模式为广播模式,消费者2保持不变,删除该topic(避免数据干扰)
consumer.setMessageModel(MessageModel.BROADCASTING);
重启消费者1,消费者2,最后启动生产者(将生产者的生产消息数量修改为100)
消费者2
消费者1
这里有个问题:将生产者的消息调小到8的时候有时会发现消费者1的广播收不到消息的问题,消费者2却正常消费一半的消息....
个人理解:广播在rocketmq中是消费者的设置,让单个消费者可以在特殊的情况下消费整个topic的消息
4.总结
- 广播模式设置是在消费者那边设置的,可以额外的消费该topic的所有消息,其他的都是均衡消费的
- 一个消费组中的消费者对于同一topic是均衡消费的
- 可以让一个消费者单独使用一个消费组实现类似的广播消费
以上是关于Apache RocketMQ:理解官方的Broadcasting的例子的主要内容,如果未能解决你的问题,请参考以下文章
Apache RocketMQ:理解官方的Broadcasting的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子
Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子