RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?
Posted 博学谷狂野架构师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?相关的知识,希望对你有一定的参考价值。
RocketMQ消费者保障
消息确认机制
consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)
什么是ACK
消息确认机制
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?
为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
保证数据能被正确处理而不仅仅是被Consumer收到,我们就不能采用no-ack或者auto-ack,我们需要手动ack(manual-ack)。在数据处理完成后手动发送ack,这个时候Server才将Message删除。
RocketMQ ACK
由于以上工作所有的机制都实现在PushConsumer中,所以本文的原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer
。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。
注:广播消费和集群消费的处理有部分区别,以下均特指集群消费(CLSUTER),广播(BROADCASTING)下部分可能不适用。
保证消费成功
PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。
代码示例
消费的时候,我们需要注入一个消费回调,具体sample代码如下:
COPYconsumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//执行真正消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才会认为这批消息(默认是1条)是消费完成的。
如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ就会认为这批消息消费失败了。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
ACK进度保存
启动的时候从哪里消费
当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。
如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:
COPYCONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET
,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。
对于老消费组想跳过历史消息需要自身做过滤,或者使用先修改消费进度
消息ACK消费进度
RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。
如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的,每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。
但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:
这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。
在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。
重复消费
在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。
对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。
实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer
中有个配置consumeConcurrentlyMaxSpan
COPY/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。
但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复!
Ack卡进度解决方案
实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景。 这也是我们公司自己定制的解决方案。
部分源码如下:
COPYclass ConsumeRequestWithUnAck implements Runnable
final ConsumeRequest consumeRequest;
final long resendAfterIfStillUnAck;//n毫秒没有消费完,就重发
ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck)
this.consumeRequest = consumeRequest;
this.resendAfterIfStillUnAck = resendAfterIfStillUnAck;
@Override
public void run()
//每次消费前,计划延时任务,超时则ack并重发
final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest);
ScheduledFuture scheduledFuture=null;
if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown())
scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS);
try
this.consumeRequest.run();//正常执行并更新offset
finally
if (scheduledFuture != null) scheduledFuture.cancel(false);//消费结束后,取消任务
-
定义了一个装饰器,把原来的ConsumeRequest对象包了一层。
-
装饰器中,每条消息消费前都会调度一个调度器,定时触发,触发的时候如果发现消息还存在,就执行sendback并ack的操作。
后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题。只是实现方式上有所不同(事实上我认为RocketMQ的方案还不够完善)
-
在pushConsumer中 有一个
consumeTimeout
字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。 -
消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。
核心源码如下:
COPY//ConsumeMessageConcurrentlyService.java
public void start()
this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable()
@Override
public void run()
cleanExpireMsg();
, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg()
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext())
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly())
return;
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++)
MessageExt msg = null;
try
this.lockTreeMap.readLock().lockInterruptibly();
try
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000)
msg = msgTreeMap.firstEntry().getValue();
else
break;
finally
this.lockTreeMap.readLock().unlock();
catch (InterruptedException e)
log.error("getExpiredMsg exception", e);
try
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic=, msgId=, storeHost=, queueId=, queueOffset=", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try
this.lockTreeMap.writeLock().lockInterruptibly();
try
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey())
try
msgTreeMap.remove(msgTreeMap.firstKey());
catch (Exception e)
log.error("send expired msg exception", e);
finally
this.lockTreeMap.writeLock().unlock();
catch (InterruptedException e)
log.error("getExpiredMsg exception", e);
catch (Exception e)
log.error("send expired msg exception", e);
通过这个逻辑对比我定制的时间,可以看出有几个不太完善的问题:
- 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
- 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况。因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
注意 以下内容都只针对无序消息生效。
重试次数
消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。
注意 一条消息无论重试多少次,这些重试消息的Message ID不会改变。
和生产端重试区别
消费者和生产者的重试还是有区别的,主要有两点
- 默认重试次数:Product默认是2次,而Consumer默认是16次。
- 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照
1S,5S,10S,30S,1M,2M····2H
进行重试。 - Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。
配置方式
重试配置方式
消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 方式1:返回RECONSUME_LATER(推荐)
- 方式2:返回Null
- 方式3:抛出异常
示例代码
COPY//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
//消息处理逻辑抛出异常,消息将重试。
doConsumeMessage(list);
//方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER,消息将重试。
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//方式2:返回null,消息将重试。
// return null;
//方式3:直接抛出异常,消息将重试。
// throw new RuntimeException("Consumer Message exception");
);
无需重试的配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。
示例代码
COPY//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
//消息处理逻辑抛出异常,消息将重试。
try
doConsumeMessage(list);
catch (Exception e)
//捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//业务方正常消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
获取消息重试次数
消费者收到消息后,可按照以下方式获取消息的重试次数:
COPY//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
doConsumeMessage(list);
//获取消息重试次数
int retryTimes = list.get(0).getReconsumeTimes();
//业务方正常消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
效果演示
消费端只发送一条消息进行测试重试
消费端主动拒绝
演示代码
COPYpublic class RetryConsumer
public static void main(String[] args) throws Exception
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
if (list != null)
for (MessageExt ext : list)
//获取消息重试次数
int retryTimes = ext.getReconsumeTimes();
try
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],消息重试次数:[" + retryTimes + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
catch (UnsupportedEncodingException e)
e.printStackTrace();
//业务主动拒绝消息
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
重试消息
消费端返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
,消费端就会不断的重试。
COPYConsumer-线程名称=[35],消息重试次数:[0],接收时间:[1608117780264],消息=[Hello Java demo RocketMQ ]
Consumer-线程名称=[36],消息重试次数:[1],接收时间:[1608117790840],消息=[Hello Java demo RocketMQ ]
Consumer-线程名称=[37],消息重试次数:[2],接收时间:[1608117820876],消息=[Hello Java demo RocketMQ ]
异常重试
演示代码
这里的代码意思很明显: 主动抛出一个异常,然后如果超过3次,那么就不继续重试下去,而是将该条记录保存到数据库由人工来兜底。
COPYpublic class RetryConsumer
public static void main(String[] args) throws Exception
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
if (list != null)
for (MessageExt ext : list)
//获取消息重试次数
int retryTimes = ext.getReconsumeTimes();
try
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],消息重试次数:[" + retryTimes + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
//这里设置重试大于3次 那么通过保存数据库 人工来兜底
if (retryTimes >= 2)
System.out.println("该消息已经重试3次,保存数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
catch (UnsupportedEncodingException e)
e.printStackTrace();
//主动抛出异常
throw new RuntimeException("=======这里出错了============");
//业务方正常消费
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
重试消息
COPYConsumer-线程名称=[36],消息重试次数:[0],接收时间:[1608118304600],消息=[Hello Java demo RocketMQ ]
Consumer-线程名称=[37],消息重试次数:[1],接收时间:[1608118315165],消息=[Hello Java demo RocketMQ ]
Consumer-线程名称=[38],消息重试次数:[2],接收时间:[1608118345191],消息=[Hello Java demo RocketMQ ]
该消息已经重试3次,保存数据库。topic=[TagA],keys=[null],msg=[Hello Java demo RocketMQ ]
超时重试
这里的超时异常并非真正意义上的超时,它指的是指获取消息后,因为某种原因没有给RocketMQ返回消费的状态,即没有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
或return ConsumeConcurrentlyStatus.RECONSUME_LATER
那么 RocketMQ会认为该消息没有发送,会一直发送。因为它会认为该消息根本就没有发送给消费者,所以肯定没消费。
演示代码
COPYpublic class RetryConsumer
public static void main(String[] args) throws Exception
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
if (list != null)
for (MessageExt ext : list)
//获取消息重试次数
int retryTimes = ext.getReconsumeTimes();
try
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],消息重试次数:[" + retryTimes + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
catch (UnsupportedEncodingException e)
e.printStackTrace();
//这里睡眠60秒
try
TimeUnit.SECONDS.sleep(60);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("休眠60秒 看还能不能走到这里...");
// 业务方正常消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
重试消息
当获得 当前消费重试次数为 = 0 后 , 关掉该进程。再重新启动该进程,那么依然能够获取该条消息
COPYConsumer-线程名称=[33],消息重试次数:[0],接收时间:[1608118652598],消息=[Hello Java demo RocketMQ ]
重启消费者
COPYConsumer-线程名称=[28],消息重试次数:[0],接收时间:[1608118683304],消息=[Hello Java demo RocketMQ ]
休眠60秒 看还能不能走到这里...
重试消息的处理
一般情况下我们在实际生产中是不需要重试16次,这样既浪费时间又浪费性能,理论上当尝试重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。
COPYpublic class RetryConsumer
public static void main(String[] args) throws Exception
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe("topicTest", "*");
//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently()
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context)
if (list != null)
for (MessageExt ext : list)
//获取消息重试次数
int retryTimes = ext.getReconsumeTimes();
try
String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],消息重试次数:[" + retryTimes + "],接收时间:[" + new Date().getTime() + "],消息=[" + message + "]");
//这里设置重试大于3次 那么通过保存数据库 人工来兜底
if (retryTimes >= 2)
System.out.println("该消息已经重试3次,保存数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
catch (UnsupportedEncodingException e)
e.printStackTrace();
//主动抛出异常
throw new RuntimeException("=======这里出错了============");
//业务方正常消费
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
System.out.println("消息消费者已启动");
所以任何异常都要捕获返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字,如下图:
注意点
- 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
- 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。
RocketMQ重试流程
重试队列与死信队列
在介绍RocketMQ的消费重试机制之前,需要先来说下“重试队列”和“死信队列”两个概念。
重试队列
如果Consumer端因为各种类型异常导致本次消费失败,为防止该消息丢失而需要将其重新回发给Broker端保存,保存这种因为异常无法正常消费而回发给MQ的消息队列称之为重试队列。RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
死信队列
由于有些原因导致Consumer端长时间的无法正常消费从Broker端Pull过来的业务消息,为了确保消息不会被无故的丢弃,那么超过配置的“最大重试消费次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试消费的次数为16次)。Broker端通过校验判断,如果超过了最大重试消费次数则会将消息移至这里所说的死信队列。这里,RocketMQ会为每个消费组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。一般在实际应用中,移入至死信队列的消息,需要人工干预处理;
Consumer端回发消息至Broker端
在业务工程中的Consumer端(Push消费模式下),如果消息能够正常消费需要在注册的消息监听回调方法中返回CONSUME_SUCCESS的消费状态,否则因为各类异常消费失败则返回RECONSUME_LATER的消费状态。消费状态的枚举类型如下所示:
COPYpublic enum ConsumeConcurrentlyStatus
//业务方消费成功
CONSUME_SUCCESS,
//业务方消费失败,之后进行重新尝试消费
RECONSUME_LATER;
如果业务工程对消息消费失败了,那么则会抛出异常并且返回这里的RECONSUME_LATER状态。这里,在消费消息的服务线程—consumeMessageService中,将封装好的消息消费任务ConsumeRequest提交至线程池—consumeExecutor异步执行。从消息消费任务ConsumeRequest的run()方法中会执行业务工程中注册的消息监听回调方法,并在processConsumeResult方法中根据业务工程返回的状态(CONSUME_SUCCESS或者RECONSUME_LATER)进行判断和做对应的处理。
CONSUME_SUCCESS
业务方正常消费
正常情况下,设置ackIndex的值为consumeRequest.getMsgs().size() - 1,因此后面的遍历consumeRequest.getMsgs()消息集合条件不成立,不会调用回发消费失败消息至Broker端的方法—sendMessageBack(msg, context)。最后,更新消费的偏移量;
RECONSUME_LATER
业务方消费失败
异常情况下,设置ackIndex的值为-1,这时就会进入到遍历consumeRequest.getMsgs()消息集合的for循环中,执行回发消息的方法—sendMessageBack(msg, context)。这里,首先会根据brokerName得到Broker端的地址信息,然后通过网络通信的Remoting模块发送RPC请求到指定的Broker上,如果上述过程失败,则创建一条新的消息重新发送给Broker,此时新消息的Topic为“%RETRY%+ConsumeGroupName”—重试队列的主题。其中,在MQClientAPIImpl实例的consumerSendMessageBack()方法中封装了ConsumerSendMsgBackRequestHeader的请求体,随后完成回发消费失败消息的RPC通信请求(业务请求码为:CONSUMER_SEND_MSG_BACK)。倘若上面的回发消息流程失败,则会延迟5S后重新在Consumer端进行重新消费。与正常消费的情况一样,在最后更新消费的偏移量;
Broker端对于回发消息处理的主要流程
Broker端收到这条Consumer端回发过来的消息后,通过业务请求码(CONSUMER_SEND_MSG_BACK)匹配业务处理器—SendMessageProcessor来处理。在完成一系列的前置校验(这里主要是“消费分组是否存在”、“检查Broker是否有写入权限”、“检查重试队列数是否大于0”等)后,尝试获取重试队列的TopicConfig对象(如果是第一次无法获取到,则调用createTopicInSendMessageBackMethod()方法进行创建)。根据回发过来的消息偏移量尝试从commitlog日志文件中查询消息内容,若不存在则返回异常错误。
然后,设置重试队列的Topic—“%RETRY%+consumerGroup”至MessageExt的扩展属性“RETRY_TOPIC”中,并对根据延迟级别delayLevel和最大重试消费次数maxReconsumeTimes进行判断,如果超过最大重试消费次数(默认16次),则会创建死信队列的TopicConfig对象(用于后面将回发过来的消息移入死信队列)。在构建完成需要落盘的MessageExtBrokerInner对象后,调用“commitLog.putMessage(msg)”方法做消息持久化。这里,需要注意的是,在putMessage(msg)的方法里会使用“SCHEDULE_TOPIC_XXXX”和对应的延迟级别队列Id分别替换MessageExtBrokerInner对象的Topic和QueueId属性值,并将原来设置的重试队列主题(“%RETRY%+consumerGroup”)的Topic和QueueId属性值做一个备份分别存入扩展属性properties的“REAL_TOPIC”和“REAL_QID”属性中。看到这里也就大致明白了,回发给Broker端的消费失败的消息并非直接保存至重试队列中,而是会先存至Topic为“SCHEDULE_TOPIC_XXXX”的定时延迟队列中。
疑问:上面说了RocketMQ的重试队列的Topic是“%RETRY%+consumerGroup”,为啥这里要保存至Topic是“SCHEDULE_TOPIC_XXXX”的这个延迟队列中呢?
在源码中搜索下关键字—“SCHEDULE_TOPIC_XXXX”,会发现Broker端还存在着一个后台服务线程—ScheduleMessageService(通过消息存储服务—DefaultMessageStore启动),通过查看源码可以知道其中有一个DeliverDelayedMessageTimerTask定时任务线程会根据Topic(“SCHEDULE_TOPIC_XXXX”)与QueueId,先查到逻辑消费队列ConsumeQueue,然后根据偏移量,找到ConsumeQueue中的内存映射对象,从commitlog日志中找到消息对象MessageExt,并做一个消息体的转换(messageTimeup()方法,由定时延迟队列消息转化为重试队列的消息),再次做持久化落盘,这时候才会真正的保存至重试队列中。看到这里就可以解释上面的疑问了,定时延迟队列只是为了用于暂存的,然后延迟一段时间再将消息移入至重试队列中。RocketMQ设定不同的延时级别delayLevel,并且与定时延迟队列相对应,具体源码如下:
COPY//省略
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
/**
* 定时延时消息主题的队列与延迟等级对应关系
* @param delayLevel
* @return
*/
public static int delayLevel2QueueId(final int delayLevel)
return delayLevel - 1;
Consumer端消费重试机制
每个Consumer实例在启动的时候就默认订阅了该消费组的重试队列主题,DefaultMQPushConsumerImpl的copySubscription()方法中的相关代码如下:
COPYprivate void copySubscription() throws MQClientException
//省略其他代码...
switch (this.defaultMQPushConsumer.getMessageModel())
case BROADCASTING:
break;
case CLUSTERING://如果消息消费模式为集群模式,还需要为该消费组对应一个重试主题
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
//省略其他代码...
因此,这里也就清楚了,Consumer端会一直订阅该重试队列主题的消息,向Broker端发送如下的拉取消息的PullRequest请求,以尝试重新再次消费重试队列中积压的消息。
COPYPullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]
最后,给出一张RocketMQ消息重试机制的框图(ps:这里只是描述了消息消费失败后重试拉取的部分重要过程):
本文由
传智教育博学谷狂野架构师
教研团队发布。如果本文对您有帮助,欢迎
关注
和点赞
;如果您有任何建议也可留言评论
或私信
,您的支持是我坚持创作的动力。转载请注明出处!
RocketMQ广播消费本地Offset文件丢失问题探秘
文章目录
今天本来在用RocketMQ做一个大的功能改造,中间有个小问题随意跟了下源码,突然还发现一个小BUG。一通跟踪调试,虽然最后还是没有解决问题,但是受益匪浅。记录一下。
一、问题出发点
我们知道RocketMQ的消费者有两种消费模式MessageModel,一种是集群消费,一种是广播消费。这两种消费机制最本质的区别在于,集群消费是在Broker端保存各个ConsumeGroup的消费进度Offset,而广播消费则是在Consumer本地记录消费进度Offset。
集群消费用得比较多,之前也认真跟过源码。但是广播消费因为用得比较少,所以基本也跟你一样,背过一点八股文,也就没有太过关注。但是,今天一个偶然的业务场景用到了广播消费。当我想要去本地看一下Offset的记录时,却发现怎么也找不到。这到底是我哪个业务配置配错了?还是万能的八股文欺骗了我?这背后藏着什么样的秘密?
问题1:Offset的本地存储的目录如何进行定制?
在RocketMQ的原生API中,提供了DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING(广播消费)和MessageModel.CLUSTERING(集群消费)。对于广播消费的消费者,默认会在消费者程序所在的机器本地的$user.home/rocketmq_offset/$clientIp@DEFAULT/$group/offset.json文件中保存消费进度。
其中$部分问变量。
$user.home为系统所属用户目录。在windows下默认是C:\\Users\\$用户名。
$clientIp是消费者端的IP地址。
$group是消费者指定的所属消费者组。
这里就有了第一个问题:这个本地存储的目录如何进行定制?
问题2:rocketmq-spring-boot-starter插件如何配置这个本地存储目录?
在使用RocketMQ进行业务开发时,经常会用到SpringBoott框架来整合RocketMQ的客户端。于是,会使用到rocketmq提供的下面的Maven依赖包。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
然后,就可以使用配置或注解的方式来声明客户端,而不用再去接触RocketMQ的原生消费者API了。例如
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",messageModel= MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String>
@Override
public void onMessage(String message)
System.out.println("Received message : "+ message);
工作起来没有问题,但是,当你沿着上面的思路,想要去本地找一找这个消费者的本地缓存时,你会发现一个问题,找不到本地Offset文件(不要怀疑,如果你是用windows开发,一定找不到)。
于是,我开始冒出另外的两个问题:**问题2:rocketmq-spring-boot-starter中有几种声明消费者的方式?**是不是我的使用方式不对?**问题3:rocketmq-spring-boot-starter中是如何指定本地目录地址的?**要怎么找到本地缓存?
什么?你还不会用RocketMQ,不知道我在说什么?试试看看我的RocketMQ教程。顺便支持鼓励一下。
二、问题原因分析
查问题的时候是忙忙碌碌瞎查的,现在回顾问题,就把思路整理一下,也便于你理解。
1、RocketMQ中是如何记录广播消费的本地Offset的?
原生的RocketMQ消费者API是基础,所以这个问题必须先梳理清楚。 以下分析RocketMQ源码用的是4.9.1版本
原生的RocketMQ提供了两个消费者对象,DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING和MessageModel.CLUSTERING。这个属性是怎么跟Offset文件存储对应上的呢?其实从服务的启动过程中就能很清晰的跟踪到。
1.1、看看DefaultMQPushConsumer。
他的start()启动方法中,会启动一个defaultMQPushConsumerImpl实例,而这个defaultMQPushConsumerImpl示例会调用一个同步的start()方法。这个start()是同步的,针对Offset文件存盘问题,是为了保证存盘的Offset文件,在内存中只有一份统一的副本。带着今天的问题,可以在这个start()方法中找到一段很刺眼的代码:
if (this.defaultMQPushConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
else
//从这里可以看出,广播模式与集群模式的最本质区别就是offset存储的地方不一样。
switch (this.defaultMQPushConsumer.getMessageModel())
//广播模式是在消费者本地存储offset
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//集群模式是在Broker远端存储offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
这里就能看到广播模式和集群模式在Offset存储上的区别。
1.2:再来看看DefaultLitePullConsumer。
在他的启动过程中,类似的会创建一个defaultLitePullConsumerImpl对象。而在defaultLitePullConsumerImpl对象的启动过程中,会调用到一个initOffsetStore()方法。这个方法里的实现也是一样的刺眼:
private void initOffsetStore() throws MQClientException
if (this.defaultLitePullConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
else
switch (this.defaultLitePullConsumer.getMessageModel())
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
default:
break;
this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
this.offsetStore.load();
所以,问题自然就集中到了这个LocalFileOffsetStore中了。
1.3:看LocalFileOffsetStore是如何指定存储路径的
RocketMQ的源码中采用了充血模型的实现方式,所有关于LocalFileOffset的业务动作,都集中到了LocalFileOffsetStore.java当中。在他的构造方法当中就直接维护了一个storePath属性来维护本地存储地址。
public class LocalFileOffsetStore implements OffsetStore
//本地存储目录
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
.....
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName)
.....
//本地存储文件
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
.....
}
先看LOCAL_OFFSET_STORE_DIR,这是本地存储的根目录。跟rocketmq.client.localOffsetStoreDir这个系统属性有关,**所以要定制本地存储的目录,只需要设定rocketmq.client.localOffsetStoreDir系统属性即可。**而这个系统属性还没有支持前端配置,所以,修改的方式,只能是在应用启动时手动进行指定。 例如 System.setProperty(“rocketmq.client.localOffsetStoreDir”,“D:/.rockemtq_offset”)。
然后,再来看offsets.json的具体存储路径storePath。看这个结构,就跟最开始直接指出的结论对应上了。但是这样其中还有一个变量,就是clinetId。这个属性是如何指定的呢?是不是可以定制呢?
这里面的mQClientFactory是一个MQClientInstance的实例对象。而MQClientInstance则是RocketMQ中对所有客户端的抽象。生产者和消费者最终都会交由MQClientInstance进行统一的服务启动。在MQClientInstance的start()方法中给所有的客户端定义了一个统一的启动标准,不同的客户端只要按照标准去注册不同的信息即可。而ClientId就是MQClientInstance在初始化的过程中指定的一个属性(初始化过程见MQClientManager.java#getOrCreateMQClientInstance方法)。ClientId的具体生成逻辑则是在ClientConfig对象中的buildMQClientId方法中。
//指定instanceName
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
//构建clientId
public String buildMQClientId()
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName))
sb.append("@");
sb.append(this.unitName);
return sb.toString();
这里面又引出了一个变量instanceName,这个变量对于广播消息的本地Offset存储路径起了很重要的作用。
-
这个instanceName,默认值是DEFAULT。
-
可以由系统属性rocketmq.client.name来替代这个默认值,这个系统属性依然是没有配置属性定制的,只能手动修改。
-
这个instanceName在ClientConfig中还有对应的setter方法。可以由消费者客户端自行指定。而常用的RocketMQ客户端都是ClientConfig的子类,所以,他们都可以通过setter方法定制instanceName。
这个unitMode和unitName不太清楚具体干嘛的,unitMode默认是false,然后在源码中搜索了一下,好像也就在ClientConfig配置本地存储目录时用到了,在核心业务中并没有太多的作用。
github仓库中只有一条关于unitMode的issue,回复也是目前并没有做什么实际的工作。参见https://github.com/apache/rocketmq/issues/639
1.4:Offsets.json文件何时写入?
基于充血模型设计的好处在于,系统内的所有核心业务能力都在充血实体中统一整理,不需要去其他地方到处乱搜。对于Offsets.json文件的核心写入能力,也一起体现在了LocalFileOffsetStore类中。在LocalFileOffsetStore类中,有一个persistAll方法,实现了文件的写入。
public void persistAll(Set<MessageQueue> mqs)
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
if (mqs.contains(entry.getKey()))
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null)
try
MixAll.string2File(jsonString, this.storePath);
catch (IOException e)
log.error("persistAll consumer offset Exception, " + this.storePath, e);
在这个方法中可以看到,如果offsets.json文件写入失败,RocketMQ只是记录一条log日志就没事了,甚至连异常都没有往外抛。这意味着如果广播消息本地的offsets.json进度没有更新,RocketMQ不会做任何的补救措施。
不过,RocketMQ的客户端会启动一个线程,不断的尝试将这些offset偏移信息写入到文件当中,这也算是一种处理的方式把。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
MQClientInstance.this.persistAllConsumerOffset();
catch (Exception e)
log.error("ScheduledTask persistAllConsumerOffset exception", e);
, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
1.5: 针对问题的总结
经过一通整理,回到之前提到的广播消息的offsets文件内容定制的问题,可以总结出这样几个结论:
- 消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。
- 本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性 rocketmq.client.name 进行修改。另外,每个消费者对象也可以单独设定instanceName。
- RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的不就,也就是说不会对业务有很大的影响。
2、rocketmq-spring-boot-starter如何创建RocketMQ的消费者?
接下来开始梳理rocketmq-spring-boot-starter对消费者端的封装。梳理封装的过程,其实也是在整理如何高效的使用RocketMQ的原生API,这一对于加深对于RocketMQ原生API的理解是非常有帮助的。如果你也尝试跟着这篇文章一起梳理源码,记得带上之前剔除的小问题。问题的本身相对比较简单,但是带上一个具体的问题去看源码,绝对会让你看源码的感觉不一样。
rocketmq-spring-boot-starter中在RocketMQ消费者这一块,封装了封装了两种模式,Push模式和Pull模式。
- Push模式就是通过@RocketMQMessageListener注解声明一个RocketMQListener接口的实现类来声明一个消费者。Broker推过来的消息,会自行进入其中的onMessage方法进行处理。
- Pull模式是通过内置的RocketMQTemplate对象的receive方法以及一系列的sendAndReceive方法,由消费者端主动去拉取消息。消息拉取过来后,可以通过里面的convert对象转换成所需要的结果对象。
- 在Pull模式中,如果一个restTemplate实例不够用,还可以通过@ExtRocketMQTemplateConfiguration和@ExtRocketMQConsumerConfiguration两个注解来注册额外的rocketmqTemplate实例。一个用来初始化rocketmqTemplate中的消息发送者。例如发送事务消息时,一个restTemplate只能对应一个事务监听逻辑。如果你的项目中有多个事务消息逻辑,就需要注册多个restTemplate实例,来对应不同的事务监听器。另一个用来初始化rocketmqTemplate中的消费者。
老规矩,如果使用还不太熟练,可以看下我的RocketMQ教程。支持鼓励一下。
接下来,就按Push模式和Pull模式分开来梳理。其中,Push模式是分析的重点,因为这是平常用得做多的一种模式。Pull模式用得比较少,就当做是补充的彩蛋了。
2.1、Push模式
Push模式对于@RocketMQMessageListener注解的处理方式,入口在rocketmq-spring-boot-2.2.1.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。
怎么找到的?评经验猜以及碰运气。
这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接口,当Spring容器当中所有非懒加载的实例加载完成后,就会触发他的afterSingletonsInstantiated方法进行初始化。在这个方法中会去扫描所有带有注解@RocketMQMessageListener注解的类,将他注册到内部一个Container容器当中。
public void afterSingletonsInstantiated()
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
这里这个Container可以认为是客户端实例的一个容器,通过这个容器来封装RocketMQ的原生API。
registerContainer的方法挺长的,我这里截取出跟今天的主题相关的几行重要的源码:
private void registerContainer(String beanName, Object bean)
.....
//获取Bean上面的注解
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
...
//检查注解的配置情况
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
//将扫描到的注解转化成为Container,并注册到上下文中。
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
//启动容器,这里就相当于是启动了消费者
if (!container.isRunning())
try
container.start();
catch (Exception e)
log.error("Started container failed. ", container, e);
throw new RuntimeException(e);
log.info("Register the listener to container, listenerBeanName:, containerBeanName:", beanName, containerBeanName);
这其中最关注的,当然是创建容器的createRocketMQListenerContainer方法中。而在这个方法中,你基本看不到RocketMQ的原生API,都是在创建并维护一个DefaultRocketMQListenerContainer对象。而这个DefaultRocketMQListenerContainer类,就是我们今天关注的重点。
DefaultRocketMQListenerContainer类实现了InitializingBean接口,自然要先关注他的afterPropertiesSet方法。这是Spring提供的对象初始化的扩展机制。
public void afterPropertiesSet() throws Exception
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: ", messageType);
这个方法就是用来初始化RocketMQ消费者的。在这个方法里就会创建一个RocketMQ原生的DefaultMQPushConsumer消费者。同样,方法很长,抽取出比较关注的重点源码。
private void initRocketMQPushConsumer() throws MQClientException
.....
//检查并创建consumer对象。
if (Objects.nonNull(rpcHook))
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
else
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
// 定制instanceName,有没有很熟悉!!!
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
.....
//设定广播消费还是集群消费。
switch (messageModel)
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
//维护消费者的其他属性。
...
//指定Consumer的消费监听 --》在消费以上是关于RocketMQ是是如何管理消费进度的?又是如何保证消息成功消费的?的主要内容,如果未能解决你的问题,请参考以下文章