RocketMQ(11)——消费者拉模式和推模式
Posted elim168
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(11)——消费者拉模式和推模式相关的知识,希望对你有一定的参考价值。
消费者拉模式和推模式
RocketMQ提供了两种消息的消费模式,拉模式和推模式。我们先来看一下拉模式消费的应用。
拉模式消费
拉模式消费使用的是DefaultMQPullConsumer,核心逻辑是先拿到需要获取消息的Topic对应的队列,然后依次从队列中拉取可用的消息。拉取了消息后就可以进行处理,处理完了需要更新消息队列的消费位置。下面代码就演示了使用DefaultMQPullConsumer拉取消息进行消费的示例。核心方法就是调用consumer的pull()
拉取消息。该示例中使用的是同步拉取,即需要等待Broker响应后才能继续往下执行。如果有需要也可以使用提供了PullCallback
的重载方法。同步的pull()
返回的是PullResult对象,其中的状态码有四种状态,可以看到示例代码中分别对四种状态进行了不同的处理。只有状态为FOUND才表示拉取到了消息,此时可以进行消费。消费完了需要调用updateConsumeOffset()
更新消息队列的消费位置,这样下次通过fetchConsumeOffset()
获取消费位置时才能获取到正确的位置。如果有需要,用户也可以自己管理消息的消费位置。
@Test
public void testPullConsumer() throws Exception
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group1_pull");
consumer.setNamesrvAddr(this.nameServer);
String topic = "topic1";
consumer.start();
//获取Topic对应的消息队列
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
int maxNums = 10;//每次拉取消息的最大数量
while (true)
boolean found = false;
for (MessageQueue messageQueue : messageQueues)
long offset = consumer.fetchConsumeOffset(messageQueue, false);
PullResult pullResult = consumer.pull(messageQueue, "tag8", offset, maxNums);
switch (pullResult.getPullStatus())
case FOUND:
found = true;
List<MessageExt> msgs = pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "收到了消息,数量----" + msgs.size());
for (MessageExt msg : msgs)
System.out.println(messageQueue.getQueueId() + "处理消息——" + msg.getMsgId());
long nextOffset = pullResult.getNextBeginOffset();
consumer.updateConsumeOffset(messageQueue, nextOffset);
break;
case NO_NEW_MSG:
System.out.println("没有新消息");
break;
case NO_MATCHED_MSG:
System.out.println("没有匹配的消息");
break;
case OFFSET_ILLEGAL:
System.err.println("offset错误");
break;
if (!found) //没有一个队列中有新消息,则暂停一会。
TimeUnit.MILLISECONDS.sleep(5000);
笔者的代码中在一次拉取请求返回,没有拉取到消息时会睡眠5秒,这只是一个简单的示例,实际应用中,这可能不是你想要的结果。你可能不希望没拉取到消息的时候睡眠一段时间,也不希望总是不断的重复尝试,这样你可以考虑使用pullBlockIfNotFound()
,使用它如果在Broker上没有新消息,会在Broker端阻塞一段时间,直到有新消息或超时发生,默认是30秒,该时间可以通过setConsumerTimeoutMillisWhenSuspend()
指定,但是RocketMQ官方不建议我们修改。使用pullBlockIfNotFound()
时注意不要像下面这样在一个线程中遍历所有的队列,然后还基于队列使用pullBlockIfNotFound()
,这会导致当前队列无消息时阻塞,而其它队列有新消息时需要等待当前队列拉取超时后才可以进行拉取。
@Test
public void testPullConsumer() throws Exception
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group1_pull");
consumer.setNamesrvAddr(this.nameServer);
String topic = "topic1";
consumer.start();
//获取Topic对应的消息队列
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
int maxNums = 10;//每次拉取消息的最大数量
while (true)
for (MessageQueue messageQueue : messageQueues)
long offset = consumer.fetchConsumeOffset(messageQueue, false);
PullResult pullResult = consumer.pullBlockIfNotFound(messageQueue, "tag8", offset, maxNums);
switch (pullResult.getPullStatus())
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "收到了消息,数量----" + msgs.size());
for (MessageExt msg : msgs)
System.out.println(messageQueue.getQueueId() + "处理消息——" + msg.getMsgId());
long nextOffset = pullResult.getNextBeginOffset();
consumer.updateConsumeOffset(messageQueue, nextOffset);
break;
case NO_NEW_MSG:
System.out.println("没有新消息");
break;
case NO_MATCHED_MSG:
System.out.println("没有匹配的消息");
break;
case OFFSET_ILLEGAL:
System.err.println("offset错误");
break;
拉取消息进行消费的时候消息可能会消费失败,消息消费失败后需要把它通过sendMessageBack()
再丢回Broker,丢回Broker后消息才能进行下一次消费。可以通过delayLevel指定一个延时级别,delayLevel=3表示延时10秒。尽管有消息消费失败,但是更新MessageQueue消费的offset时还是一样的更新。下面代码就展示了消息消费失败的处理。
@Test
public void testPullConsumer() throws Exception
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group1_pull");
consumer.setNamesrvAddr(this.nameServer);
String topic = "topic1";
consumer.start();
//获取Topic对应的消息队列
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
System.out.println(messageQueues);
int maxNums = 10;//每次拉取消息的最大数量
while (true)
boolean found = false;
for (MessageQueue messageQueue : messageQueues)
long offset = consumer.fetchConsumeOffset(messageQueue, false);
PullResult pullResult = consumer.pull(messageQueue, "tag8", offset, maxNums);
switch (pullResult.getPullStatus())
case FOUND:
found = true;
List<MessageExt> msgs = pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "收到了消息,数量----" + msgs.size());
for (MessageExt msg : msgs)
System.out.println(messageQueue.getQueueId() + "处理消息——" + msg.getMsgId());
if (new Random().nextInt(10) % 3 == 0)
consumer.sendMessageBack(msg, 3);
System.out.println("消息消费失败----" + msg.getMsgId());
long nextOffset = pullResult.getNextBeginOffset();
consumer.updateConsumeOffset(messageQueue, nextOffset);
break;
case NO_NEW_MSG:
System.out.println("没有新消息");
break;
case NO_MATCHED_MSG:
System.out.println("没有匹配的消息");
break;
case OFFSET_ILLEGAL:
System.err.println("offset错误");
break;
if (!found) //没有一个队列中有新消息,则暂停一会。
TimeUnit.MILLISECONDS.sleep(5000);
消息消费失败后通过sendMessageBack()
发回给Broker的消息会发送到以当前消费者组名称加上%RETRY%
前缀作为Topic的Topic上。比如上面代码消费者组名称是group1_pull
,那么对应的消费失败的消息发回去就会发到%RETRY%group1_pull
这个Topic上。我们如果需要消费这些失败的消息,就可以通过%RETRY%group1_pull
进行消息消费,比如下面这样。
@Test
public void testPullConsumeRetry() throws Exception
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group1_pull");
consumer.setNamesrvAddr(this.nameServer);
String topic = "%RETRY%group1_pull";
consumer.start();
//获取Topic对应的消息队列
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);
System.out.println(messageQueues);
int maxNums = 10;//每次拉取消息的最大数量
while (true)
boolean found = false;
for (MessageQueue messageQueue : messageQueues)
long offset = consumer.fetchConsumeOffset(messageQueue, false);
PullResult pullResult = consumer.pull(messageQueue, "*", offset, maxNums);
switch (pullResult.getPullStatus())
case FOUND:
found = true;
List<MessageExt> msgs = pullResult.getMsgFoundList();
System.out.println(messageQueue.getQueueId() + "收到了消息,数量----" + msgs.size());
for (MessageExt msg : msgs)
System.out.println(messageQueue.getQueueId() + "处理消息——" + msg.getMsgId());
long nextOffset = pullResult.getNextBeginOffset();
consumer.updateConsumeOffset(messageQueue, nextOffset);
break;
case NO_NEW_MSG:
System.out.println("没有新消息");
break;
case NO_MATCHED_MSG:
System.out.println("没有匹配的消息");
break;
case OFFSET_ILLEGAL:
System.err.println("offset错误");
break;
if (!found) //没有一个队列中有新消息,则暂停一会。
TimeUnit.MILLISECONDS.sleep(5000);
我们的消费者其实已经默认可以消费该重试主题的消息了。没必要像上面这样再写一个单独的消费过程。
推模式消费
之前介绍的都是使用推模式消费,使用推模式消费时消息不是真的由Broker推送过来的,它底层还是使用的拉模式,拉取到了消息后就调用回调方法进行消息消费,根据回调方法返回的状态决定是否需要丢回Broker进行下一次消费。笔者也比较喜欢推模式消费,因为推模式消费的API比较简单,拉模式消费的很多逻辑都已经被封装好了。推模式消费使用的是DefaultMQPushConsumer,比如下面这样就是通过推模式进行消费,通过subscribe()
指定了需要订阅的Topic和消息Tag。通过registerMessageListener()
注册了收到消息后需要进行消息消费的监听器,可选的有并发消费和顺序消费,下面使用的是并发消费。
@Test
public void testPushConsumer() throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_push");
consumer.setNamesrvAddr(this.nameServer);
consumer.subscribe("topic1", "tag8");
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
System.out.println("收到消息——" + msgs.get(0).getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
TimeUnit.MINUTES.sleep(10);
consumer.shutdown();
DefaultMQPushConsumer默认会不间断的拉取消息,可以通过setPullInterval()
指定间隔时间,单位是毫秒。拉取的消息会在本地队列中存一份,当该队列中的消息量超过了1000后就不会再拉消息了,这个1000是默认值,可以通过setPullThresholdForQueue()
设置。DefaultMQPushConsumer拉取回来的消息会丢到一个线程池中进行消费,线程池的最小线程数默认是20,最大线程数是64,可以通过setConsumeThreadMin()
设置最小线程数,通过setConsumeThreadMax()
设置最大线程数。DefaultMQPushConsumer启动后默认会从消费者上次消费的位置开始消费,如果是一个新的消费者组,则会从消息队列中未过期的第一条消息开始消费。可以通过setConsumeFromWhere()
改变新的消费者组消费消息的位置,默认值是CONSUME_FROM_LAST_OFFSET
,可选值还有CONSUME_FROM_FIRST_OFFSET
和CONSUME_FROM_TIMESTAMP
。当选择的消费位置是CONSUME_FROM_TIMESTAMP
时,默认是从30分钟以前开始消费。可以通过setConsumeTimestamp()
进行设置,格式是yyyyMMddHHmmss
,即精确到秒。每个消费者线程的超时时间是15分钟,可以通过setConsumeTimeout()
设置,单位是分钟。对于消费失败的消息,丢回给Broker后最多能再次消费16次,且每次重新消费的间隔时间都比上次长,重新消费的时间使用的是内部的delayLevel机制,每次加1。RocketMQ的delayLevel共18个取值,具体为1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h。可以通过setMaxReconsumeTimes()
指定消费失败的消息可重新消费的最大次数。
(注:本文是基于RocketMQ4.5.0所写)
以上是关于RocketMQ(11)——消费者拉模式和推模式的主要内容,如果未能解决你的问题,请参考以下文章
深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)