RocketMQ的消费模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的消费模式相关的知识,希望对你有一定的参考价值。
参考技术A RocketMQ消息订阅有两种模式,一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送;另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQ Server拉取。但在具体实现时,Push和Pull模式本质都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。Push模式好处就是实时性高。不好处在于消费端的处理能力有限,当瞬间推送很多消息给消费端时,容易造成消费端的消息积压,严重时会压垮客户端。
Pull模式好处就是主动权掌握在消费端自己手中,根据自己的处理能力量力而行。缺点就是如何控制Pull的频率。定时间隔太久担心影响时效性,间隔太短担心做太多“无用功”浪费资源。比较折中的办法就是长轮询。
Push方式里,consumer把长轮询的动作封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。
Pull方式里,取消息的过程需要用户自己主动调用,首先通过打算消费的Topic拿到
MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
RocketMQ使用长轮询机制来模拟Push效果,算是兼顾了二者的优点。
RocketMQ-广播模式消费
Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加
consumer.setMessageModel(MessageModel.BROADCASTING);
就可以了。我们看实验步骤:
一、启动ConsumerBroadCastMember1
二、启动ConsumerBroadCastMember2
三、运行ProducerBraodCast
四、我们可以看到两个Consumer都收到了同样的消息。
Producer端:
package org.hope.lee.producer; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendCallback; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; public class ProducerBroadCast { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("push_consumer"); producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); try { // 设置实例名称 producer.setInstanceName("producer_broadcast"); // 设置重试次数 producer.setRetryTimesWhenSendFailed(3); // 开启生产者 producer.start(); // 创建一条消息 Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes()); SendResult send = producer.send(msg); System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus()); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } producer.shutdown(); } }
Consumer端:
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); //设置广播消费 consumer.setMessageModel(MessageModel.BROADCASTING); //设置集群消费 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅PushTopic下Tag为push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer1 Started."); } } class MqBroadCastListener implements MessageListenerConcurrently{ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { MessageExt msg = msgs.get(0); String msgBody = new String(msg.getBody(), "utf-8"); System.out.println("msgBody:" + msgBody); } catch(Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
package org.hope.lee.consumer; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; public class ConsumerBroadCastMember2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast"); consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876"); // 批量消费,每次拉取10条 consumer.setConsumeMessageBatchMaxSize(10); //设置广播消费 consumer.setMessageModel(MessageModel.BROADCASTING); //设置集群消费 // consumer.setMessageModel(MessageModel.CLUSTERING); // 如果非第一次启动,那么按照上次消费的位置继续消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅PushTopic下Tag为push的消息 consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C"); consumer.registerMessageListener(new MqBroadCastListener()); consumer.start(); System.out.println("Consumer2 Started."); } }
结果:
https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api
以上是关于RocketMQ的消费模式的主要内容,如果未能解决你的问题,请参考以下文章