Apache RocketMQ:理解官方的Broadcasting的例子

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:理解官方的Broadcasting的例子相关的知识,希望对你有一定的参考价值。

1. 声明

当前内容主要用于学习和理解官方的Broadcasting,当前内容参考官方文档

翻译后:广播是向一个主题的所有订户发送信息。如果想让所有订阅者都收到关于某个主题的消息,广播是一个不错的选择。

  1. 消费者组中的所有消费者默认是均衡消费当前消费组的消息(100个消息发到消费组中2个,那么每个消费者默认消费是50个消息)
  2. 不同的消费组对于相同的订阅都是消费相同个数的(A消费组100,B消费组100),如果每个组中的消费者都只有1个,那么就类似广播

2. 消费组消费消息(订阅有相同的topic)

消费者1:

public class Consumer 

    public static void main(String[] args) throws InterruptedException, MQClientException 

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
		/*
		 * consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getMQAdminImpl()
		 */
        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.1.102:9876");

        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() 
        	AtomicInteger increment =new AtomicInteger();
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) 
                //System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for (MessageExt messageExt : msgs) 
                	try 
						String msg = new String(messageExt.getBody(),RemotingHelper.DEFAULT_CHARSET).intern();
						System.out.println("Receive New Messages: "+msg);
					 catch (UnsupportedEncodingException e) 
						// TODO Auto-generated catch block
						e.printStackTrace();
					
				
                int incrementAndGet = increment.incrementAndGet();
                System.out.println("消费次数==》"+incrementAndGet);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    

消费者2:和消费者1的代码完全一样,就是订阅不同

  consumer.subscribe("TopicTest", "TagA || TagC || TagD");

生产者:与一般生产者没有任何区别!

public class BroadcastProducer 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.1.102:9876");
        producer.start();

        for (int i = 0; i < 8; i++)
            Message msg = new Message("TopicTest",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        
        producer.shutdown();
    

启动消费者1,消费者2,生产者(消费者完全启动后才让生产者启动)
消费者1

ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost, queueId=2, storeSize=212, queueOffset=125, sysFlag=0, bornTimestamp=1635646863458, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861869, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033652, commitLogOffset=210514, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=126, KEYS=OrderID188, CONSUME_START_TIME=1635646863515, UNIQ_KEY=7F0000014BA473D16E939CB760620000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null']]
消费次数==》1
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=212, queueOffset=125, sysFlag=0, bornTimestamp=1635646863505, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861887, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033726, commitLogOffset=210726, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=126, KEYS=OrderID188, CONSUME_START_TIME=1635646863527, UNIQ_KEY=7F0000014BA473D16E939CB760910001, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null']]
消费次数==》2
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=localhost, queueId=2, storeSize=212, queueOffset=126, sysFlag=0, bornTimestamp=1635646863569, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861948, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F00000000000339A2, commitLogOffset=211362, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=127, KEYS=OrderID188, CONSUME_START_TIME=1635646863585, UNIQ_KEY=7F0000014BA473D16E939CB760D10004, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null']]
消费次数==》3
ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=localhost, queueId=3, storeSize=212, queueOffset=126, sysFlag=0, bornTimestamp=1635646863585, bornHost=/192.168.1.101:60614, storeTimestamp=1635646861969, storeHost=/192.168.1.102:10911, msgId=C0A8016600002A9F0000000000033A76, commitLogOffset=211574, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='TopicTest', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=127, KEYS=OrderID188, CONSUME_START_TIME=1635646863600, UNIQ_KEY=7F0000014BA473D16E939CB760E10005, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null']]
消费次数==》4

消费者2:

Receive New Messages: Hello world
消费次数==》1
Receive New Messages: Hello world
消费次数==》2
Receive New Messages: Hello world
消费次数==》3
Receive New Messages: Hello world
消费次数==》4

两个消费着属于同一个组默认是按照均衡方式消费所有订阅消息的(必须有相同的订阅)

3. 广播模式

设置消费者1的模式为广播模式,消费者2保持不变,删除该topic(避免数据干扰)

consumer.setMessageModel(MessageModel.BROADCASTING);

重启消费者1,消费者2,最后启动生产者(将生产者的生产消息数量修改为100)
消费者2

消费者1

这里有个问题:将生产者的消息调小到8的时候有时会发现消费者1的广播收不到消息的问题,消费者2却正常消费一半的消息....

个人理解:广播在rocketmq中是消费者的设置,让单个消费者可以在特殊的情况下消费整个topic的消息

4.总结

  1. 广播模式设置是在消费者那边设置的,可以额外的消费该topic的所有消息,其他的都是均衡消费的
  2. 一个消费组中的消费者对于同一topic是均衡消费的
  3. 可以让一个消费者单独使用一个消费组实现类似的广播消费

以上是关于Apache RocketMQ:理解官方的Broadcasting的例子的主要内容,如果未能解决你的问题,请参考以下文章

Apache RocketMQ:理解官方的Broadcasting的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:理解官方的BatchFilter设置用户属性的例子

Apache RocketMQ:使用官方demo测试rocketmq

RocketMQ部署以及调优

RocketMQ架构原理及名词概念