RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码相关的知识,希望对你有一定的参考价值。
消费重试:并发消费和顺序消费对于消费失败的消息均会有消息重试机制。此前已经学了相关的知识:RocketMQ的消费者消息重试和生产者消息重投,现在我们来看看他们的源码。
文章目录
1 并发消费重试
并发消费的重试与broker有关,需要借助重试队列。
1.1 失败重试
并发消费的消费结果通过ConsumeMessageConcurrentlyService#processConsumeResult方法处理。
对于消费失败的消息,广播模式下仅仅是对于消费失败的消息打印日志,并不会重试。集群模式下,则通过sendMessageBack方法处理消费失败的消息,将该消息重新发送至重试队列,延迟消费。
1.1.1 ConsumeMessageConcurrentlyService#sendMessageBack发送消费失败的消息
ConsumeMessageConcurrentlyService的方法,并发消费失败,发送消费失败的消息到broker。
从ConsumeConcurrentlyContext获取delayLevelWhenNextConsume属性作为延迟等级,默认为0。通过在业务方法中修改该属性的只可以控制延迟等级:
- -1,不重试,直接发往死信队列。
- 0,默认值,延迟等级broker端控制,默认从延迟等级level3开始,后续每次重试都是3 + 当前重试次数。
- 大于0,由client端控制,传入多少延迟等级就是多少。
注意,每次消费均产生一个新的ConsumeConcurrentlyContext对象,所以仅能设置单次发回消息的延迟等级。
/**
* ConsumeMessageConcurrentlyService的方法
* <p>
* 并发消费失败,发送消费失败的消息到broker
*
* @param msg 要发回的消息
* @param context 并发消费上下文
* @return 发送结果
*/
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context)
/*
* 从ConsumeConcurrentlyContext获取delayLevelWhenNextConsume属性作为延迟等级,默认为0。
* 通过在业务方法中修改该属性的只可以控制延迟等级:
* -1,不重试,直接发往死信队列。
* 0,默认值,延迟等级broker端控制的,默认从延迟等级level3开始,后续每次重试都是3 + 当前重试次数。
* 大于0,由client端控制,传入多少延迟等级就是多少。
*
* 注意,每次消费均产生一个新的ConsumeConcurrentlyContext对象,所以仅能设置单次发回消息的延迟等级
*/
int delayLevel = context.getDelayLevelWhenNextConsume();
//使用nameSpace包装topic
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try
//调用DefaultMQPushConsumerImpl#sendMessageBack方法发送消费失败的消息到broker
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
//没有抛出异常就算成功
return true;
catch (Exception e)
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
return false;
1.1.2 DefaultMQPushConsumerImpl#sendMessageBack发送消费失败的消息
DefaultMQPushConsumerImpl的方法,发送消费失败的消息到broker。内部调用调用MQClientAPIImpl#consumerSendMessageBack方法发送消费失败的消息到broker。
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 发送消费失败的消息到broker
*
* @param msg 要发回的消息
* @param delayLevel 延迟等级
* @param brokerName brokerName
*/
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException
try
//获取broker地址
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//调用MQClientAPIImpl#consumerSendMessageBack方法发送消费失败的消息到broker
//getMaxReconsumeTimes获取最大重试次数,通过DefaultMQPushConsumer.maxReconsumeTimes属性配置
//默认-1,表示默认重试16次
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
catch (Exception e)
//该方法抛出异常
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
//构建一个新消息
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//设置延迟等级PROPERTY_DELAY_TIME_LEVEL属性,重试次数 + 3
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
//尝试通过普通send方法发送延迟消息
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
finally
//解除nameSpace
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
1.1.1.2.1 getMaxReconsumeTimes获取最大重试次数
getMaxReconsumeTimes获取最大重试次数,通过DefaultMQPushConsumer.maxReconsumeTimes属性配置。默认-1,对于并发消费表示默认重试16次。
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 获取最大重试次数
*
* @return 最大重试次数
*/
private int getMaxReconsumeTimes()
// default reconsume times: 16
//通过DefaultMQPushConsumer.maxReconsumeTimes属性配置。默认-1,对于并发消费表示默认重试16次。
if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1)
return 16;
else
return this.defaultMQPushConsumer.getMaxReconsumeTimes();
1.1.3 consumerSendMessageBack发送消费失败的消息
MQClientAPIImpl的方法,通过netty发起远程调用,请求Code为CONSUMER_SEND_MSG_BACK,发送消费失败的消息到broker。
/**
* MQClientAPIImpl的方法
* <p>
* 发送消费失败的消息到broker
*
* @param addr broker地址
* @param msg 要发回的消息
* @param consumerGroup 消费者组
* @param delayLevel 延迟等级
* @param timeoutMillis 超时时间
* @param maxConsumeRetryTimes 最大重试次数,超过发往死信队列
*/
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException
//构建请求头
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
//Code为CONSUMER_SEND_MSG_BACK
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
//同步调用
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode())
case ResponseCode.SUCCESS:
return;
default:
break;
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
1.2 超时重试
ConsumeMessageConcurrentlyService#start方法将会通过cleanExpireMsgExecutors定时任务清理过期的消息,启动后15min开始执行,后每15min执行一次,这里的15min是RocketMQ大的默认超时时间,可通过defaultMQPushConsumer#consumeTimeout属性设置。
通过一个定时任务,每隔15min检测一次,当消息消费时间超过15min时,将该消息算作消费失败,并且将该消息通过sendMessageBack发回broker延迟topic,将在给定延迟时间之后发回进行重试消费或者发往死信队列。
该定时任务最终通过ProcessQueue#cleanExpiredMsg方法处理超时消息,其源码我们在并发消费部分已经讲过了。下面是关键截图:
这个方法实际上是DefaultMQPushConsumer提供的一个公共方法,可以看到超时重试的每一次的重试延迟等级都是是固定level 3,即延迟10s。
内部调用DefaultMQPushConsumerImpl#sendMessageBack方法,我们在上面已经讲过了。
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException
msg.setTopic(withNamespace(msg.getTopic()));
//通过DefaultMQPushConsumerImpl#sendMessageBack发送消费失败的消息,指定延迟等级
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
2 顺序消费重试
顺序消费的重试与broker无关,直接在本地延迟1s之后重新消费当前没有消费成功的消息。
2.1 失败重试
broker消费失败的重试在ConsumeMessageOrderlyService# processConsumeResult方法中实现,具体的源码我们在顺序消费部分已经讲过了。
当返回SUSPEND_CURRENT_QUEUE_A_MOMENT,表示消费失败,则调用checkReconsumeTimes方法校验是否达到最大重试次数,可以通过DefaultMQPushConsumer#maxReconsumeTimes属性配置,默认无上限,即Integer.MAX_VALUE。
checkReconsumeTimes方法将检查如果没有达到最大次数则返回true,否则将会调用sendMessageBack方法,将消息发回broker,但是不会再次被消费,而是直接被送往死信队列。
没有达到最大重试次数,那么会调用makeMessageToConsumeAgain方法标记消息等待再次消费,然后调用submitConsumeRequestLater方法延迟提交新的消费请求,默认suspendTimeMillis为-1,即延迟1s后重新消费。
所谓标记,实际上很简单,就是将需要重复消费消息从正在消费的consumingMsgOrderlyTreeMap中移除,然后重新存入待消费的msgTreeMap中,那么将会在随后的消费中被拉取,进而实现重复消费。
所以说,并发消费的重复消费,需要将消息发往broker的重试topic中,等待再次拉取并重新消费,而顺序消费的重复消费就更加简单了,直接在本地重试,不需要经过broker,直到达到了最大重试次数,才会通过sendMessageBack方法将消息发往broker,但是不会再被消费到了。
2.1.1 sendMessageBack发送失败消息
ConsumeMessageOrderlyService的sendMessageBack方法,将顺序消费重试次数达到最大值的消息构造为一个新的msg并且发往broker,topic为重试topic。
注意该方法实际上是调用同步发送消息的send方法,请求Code一般为SEND_REPLY_MESSAGE或者SEND_REPLY_MESSAGE_V2。
在broker端接收到该请求之后,会在handleRetryAndDLQ方法中判断到如果已重试次数大于等于最大重试次数,那么替换为死信topic,消息最终还是会发往死信队列。
/**
* ConsumeMessageOrderlyService的方法
* 顺序消费,将重试次数达到最大值的消息发往broker死信队列
*
* @param msg 发送的消息
* @return 是否发送成功
*/
public boolean sendMessageBack(final MessageExt msg)
try
// max reconsume times exceeded then send to dead letter queue.
//新构造一个msg
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
//设置重试次数
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
//设置最大重试次数,默认
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//设置延迟等级PROPERTY_DELAY_TIME_LEVEL属性, 3 + 重试次数
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
//调用DefaultMQProducer#send方法发送消息
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
catch (Exception e)
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
return false;
2.2 超时重试
对于顺序消费,实际上无论超时多久,无论在你的业务逻辑中卡多久,都不会单纯的因为15min的消费超时而重试。这也是为了保证顺序性的妥协,无论执行多久,你的程序终会返回最终结果,只需要根据返回的状态执行对应的逻辑即可。
3 broker处理回退请求
并发消费失败重试,以及顺序消费达到最大重试次数之后,都会向broekr发送消息发回请求,在broker端会进行判断,是继续延迟消费还是发往死信队列。
并发消费重试请求Code为CONSUMER_SEND_MSG_BACK,而顺序消费大道最大重试次数后发回broker的请求则是走的普通发送消息的请求。实际上,这两种请求,都算作是发送消息请求,在broker都通过SendMessageProcessor处理。
因此,这两个请求的broker处理的统一入口都是SendMessageProcessor#asyncProcessRequest方法,我们来看看该方法源码。
/**
* SendMessageProcessor的方法
* <p>
* 异步的处理请求
*/
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final SendMessageContext mqtraceContext;
/*
* 根据不同的请求code选择不同的处理方式
*/
switch (request.getCode())
//如果是消费者发送的消息回退请求,该请求用于实现消息重试
//如果消息消费失败,那么消息将被通过回退请求发送回broker,并延迟一段时间再消费
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
//其他情况,都是属于生产者发送消息的请求,统一处理
default:
//解析请求头
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null)
//如果请求头为null,那么返回一个null值结果
return CompletableFuture.completedFuture(null);
//构建发送请求消息轨迹上下文
mqtraceContext = buildMsgContext(ctx, requestHeader);
//执行发送消息前钩子方法
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch())
//处理批量发送消息逻辑
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
else
//处理其他发送消息逻辑,例如单条消息
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
可以看到,仅仅是对于CONSUMER_SEND_MSG_BACK请求有特殊处理,将会调用asyncConsumerSendMsgBack方法处理,这是我们主要关注的方法。
而其他请求则走通用处理逻辑,单个消息处理方法asyncSendMessage的逻辑源码我们在此前broker接收消息的部分已经讲过了,稍后着重讲一下里面的handleRetryAndDLQ方法。
3.1 asyncConsumerSendMsgBack处理回退请求
SendMessageProcessor#asyncConsumerSendMsgBack方法用于处理消息回退请求。大概步骤为:
- 前置校验。查找broker缓存的当前消费者组的订阅组配置SubscriptionGroupConfig,不存在订阅关系就直接返回,如果broker不支持写,那么直接返回。如果重试队列数量小于等于0,则直接返回,一般都是1。
- 根据consumerGroup获取对应的重试topic,这里仅仅是获取topic的名字%RETRY%+consumerGroup。随机选择一个重试队列id,一般都是0,因为重试队列数一般都是1。
- 调用createTopicInSendMessageBackMethod方法,尝试获取或者创建重试topic,其源码和创建普通topic差不多,区别就是重试topic不需要模板topic,默认读写队列数都是1,权限为读写,该方法的源码我们在broker接收消息的源码部分就已经解析了。如果创建重试topic失败,直接返回。重试topic没有写的权限,直接返回。
- 调用lookMessageByOffset方法,根据消息物理偏移量从commitLog中找到该条消息。将属性RETRY_TOPIC的设置到消息属性中,该属性值为正常的topic。
- 从请求头中获取延迟等级。从订阅关系中获取最大重试次数,如果版本大于3.4.9,那么从请求头中获取最大重试次数,这是客户端传递过来的。并发消费模式默认最大16,顺序消费默认最大Integer.MAX_VALUE。
- 如果消息已重试次数 大于等于 最大重试次数,或者延迟等级小于0,那么消息不再重试,消息将会直接发往死信队列。
- 获取该consumerGroup对应的死信队列topic,这里仅仅是获取topic的名字%DLQ%+consumerGroup。随机选择一个重试队列id,固定是0,因为死信队列数是1。
- 尝试获取或者创建死信topic,实际上调用的调用获取重试topic的createTopicInSendMessageBackMethod方法,默认读写队列数都是1,权限为读写。
- 设置消息延迟等级0,表示不会延迟,不进入延迟topic,直接发往死信队列。
- 如果没有达到最大重试次数,并且延迟等级不小于0,那么将会重试,因此设置延迟等级。
- 如果参数中的delayLevel = 0,表示broker控制延迟等级,那么delayLevel = 3 + 已重试的次数 ,即默认从level3开始,即从延迟10s开始。如果参数中的delayLevel > 0,表示consumer控制延迟等级,那么参数是多少,等级就设置为多少。
- 创建内部消息对象MessageExtBrokerInner,设置相关属性。注意这里,设置的topic为重试topic或者死信topic。设置 消费次数 + 1。
- 调用asyncPutMessage方法,以异步方式处理、存储消息,将消息存储到commitLog中。该方法的源码我们在broker接收消息部分已经讲解过了。
- 如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
- 最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。注意这里,如果保存的topic可能是重试topic,而真正的topic保存在RETRY_TOPIC属性中。
- broker后台定时任务服务ScheduleMessageService按照对应的延迟时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中,然后即可被消费者重新消费。
/**
* SendMessageProcessor的方法
* <p>
* 处理CONSUMER_SEND_MSG_BACK发回请求
*/
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//解析请求头
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
//执行前置钩子
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId()))
ConsumeMessageContext context = RocketMQ—消费者客户端详解