RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer处理Broker的拉取消息响应源码。
此前我们学习了Consumer如何发起的拉取消息请求,以及Broker如何处理拉取消息请求,现在我们来学习Consumer如何处理Broker的拉取消息响应的源码。
文章目录
1 客户端异步请求回调
此前我们讲过在consumer发起拉取消息请求的时候,通过ASYNC模式异步的进行拉取,并且InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法。
在回调方法中,如果解析到了响应结果,那么调用pullCallback#onSuccess方法处理,否则调用pullCallback#onException方法处理。
/**
* MQClientAPIImpl的方法
* 异步的拉取消息,并且触发回调函数
*
* @param addr broker地址
* @param request 请求命令对象
* @param timeoutMillis 消费者消息拉取超时时间,默认30s
* @param pullCallback 拉取到消息之后调用的回调函数
* @throws RemotingException
* @throws InterruptedException
*/
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException
/*
* 基于netty给broker发送异步消息,设置一个InvokeCallback回调对象
*
* InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法
*/
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback()
/**
* 异步执行的回调方法
*/
@Override
public void operationComplete(ResponseFuture responseFuture)
//返回命令对象
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null)
try
//解析响应获取结果
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
//如果解析到了结果,那么调用pullCallback#onSuccess方法处理
pullCallback.onSuccess(pullResult);
catch (Exception e)
//出现异常则调用pullCallback#onException方法处理异常
pullCallback.onException(e);
else
//没有结果,都调用onException方法处理异常
if (!responseFuture.isSendRequestOK())
//发送失败
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
else if (responseFuture.isTimeout())
//超时
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
else
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
);
1.1.1.1. processPullResponse解析响应
该方法处理response获取PullResult,根据响应的数据创建PullResultExt对象返回,注意此时拉取到的消息还是一个字节数组。
/**
* MQClientAPIImpl的方法
* 处理response获取PullResult
*/
private PullResult processPullResponse(
final RemotingCommand response,
final String addr) throws MQBrokerException, RemotingCommandException
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
//设置结果状态码
switch (response.getCode())
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
//解析响应头
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
//根据响应的数据创建PullResultExt对象返回,此时拉取到的消息还是一个字节数组
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
2 PullCallback回调
在processPullResponse处理response之后,会调用此前DefaultMQPushConsumerImpl#pullMessage方法中创建的PullCallback消息拉取的回调函数,执行onSuccess回调方法。如果解析过程中抛出异常,则调用onException方法。
onSuccess回调方法的大概逻辑为:
- 调用processPullResult方法处理pullResult,进行消息解码、过滤以及设置其他属性的操作,返回pullResult。
- 如果没有拉取到消息,那么设置下一次拉取的起始offset到PullRequest中,调用executePullRequestImmediately方法立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务,PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,进行下次消息的拉取。
- 如果拉取到了消息,将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中,等待被异步的消费。
- 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息,consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现。
- 获取配置的消息拉取间隔,默认为0,如果大于0则调用executePullRequestLater方法,等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即调用executePullRequestImmediately放入pullRequestQueue中,进行下次消息的拉取。
如果是onException方法,那么延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取。
PullCallback pullCallback = new PullCallback()
@Override
public void onSuccess(PullResult pullResult)
if (pullResult != null)
/*
* 1 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
*/
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus())
case FOUND:
//拉取的起始offset
long prevRequestOffset = pullRequest.getNextOffset();
//设置下一次拉取的起始offset到PullRequest中
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//增加拉取耗时
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
//如果没有消息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
/*
* 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务
* PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求
* 进行下次消息的拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
//获取第一个消息的offset
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
//增加拉取tps
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
/*
* 2 将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中
*/
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
/*
* 3 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息
* consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现
*/
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
/*
* 4 获取配置的消息拉取间隔,默认为0,则等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即放入pullRequestQueue中
* 进行下次消息的拉取
*/
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
/*
* 将executePullRequestImmediately的执行放入一个PullMessageService的scheduledExecutorService延迟任务线程池中
* 等待给定的延迟时间到了之后再执行executePullRequestImmediately方法
*/
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
/*
* 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
break;
case NO_NEW_MSG:
//没有匹配到消息
case NO_MATCHED_MSG:
//更新下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
//立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
//请求offset不合法,过大或者过小
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, ",
pullRequest.toString(), pullResult.toString());
//更新下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
//丢弃拉取请求
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
//更新下次拉取偏移量
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
//持久化offset
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
//移除对应的消费队列,同时将消息队列从负载均衡服务中移除
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);
, 10000);
break;
default:
break;
@Override
public void onException(Throwable e)
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
log.warn("execute the pull request exception", e);
/*
* 出现异常,延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
*/
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
;
2.1 processPullResult处理拉取结果
处理pullResult,进行消息解码、过滤以及设置其他属性的操作。
- 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出。
- 对消息二进制字节数组进行解码转换为java的List消息集合。
- 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤。这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode。
- 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑。
- 遍历过滤通过的消息,设置属性。例如事务id,最大、最小偏移量、brokerName。
- 将过滤后的消息存入msgFoundList集合
- 因为消息已经被解析了,那么设置消息的字节数组为null,释放内存。
/**
* PullAPIWrapper的方法
* 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
*
* @param mq 消息队列
* @param pullResult 拉取结果
* @param subscriptionData 获取topic对应的SubscriptionData订阅关系
* @return 处理后的PullResult
*/
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData)
PullResultExt pullResultExt = (PullResultExt) pullResult;
/*
* 1 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出
*/
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus())
/*
* 2 对二进制字节数组进行解码转换为java的List<MessageExt>消息集合
*/
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
/*
* 3 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤
*/
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode())
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList)
if (msg.getTags() != null)
//这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode
if (subscriptionData.getTagsSet().contains(msg.getTags()))
msgListFilterAgain.add(msg);
/*
* 4 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑
*/
if (this.hasHook())
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
/*
* 5 遍历过滤通过的消息,设置属性
*/
for (MessageExt msg : msgListFilterAgain)
//事务消息标识
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//如果是事务消息,则设置事务id
if (Boolean.parseBoolean(traFlag))
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
//将响应中的最小和最大偏移量存入msg
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
//设置brokerName到msg
msg.setBrokerName(mq.getBrokerName());
//将过滤后的消息存入msgFoundList集合
pullResultExt.setMsgFoundList(msgListFilterAgainRocketMQ—消费者客户端详解
RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码