RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者发起拉取消息请求源码。
此前我们学习了DefaultMQPushConsumer负载均衡的源码,同时我们也知道了,最初始的PullRequest,就是在负载均衡之时对于新分配到的消费队列创建的。然后通过dispatchPullRequest方法对这些PullRequest进行分发,Push模式下这些请求会被PullMessageService依次处理,后续实现自动拉取消息,以及消费。
这些PullRequest将会被存入PullMessageService服务内部的pullRequestQueue集合中,后续异步的消费,自动执行拉取消息的请求,这就是Push模式下最初的拉消息请求的来源。关于如何拉去消息以及如何消费,将是我们下一部分的内容。
/**
* 分发处理消息
* @param pullRequestList
*/
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList)
//遍历拉去请求
for (PullRequest pullRequest : pullRequestList)
//将请求存入PullMessageService服务的pullRequestQueue集合中,后续异步的消费,执行拉取消息的请求
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, , add a new pull request ", consumerGroup, pullRequest);
下面我们来介绍DefaultMQPushConsumer消费者如何拉取消息的源码,主要步骤可以分为三部分:
- consumer发送拉取消息请求。
- broker处理拉取消息请求。
- consumer接收请求响应。
本次我们学习consumer如何发送拉取消息请求。
文章目录
- 1 PullMessageService拉取消息
- 2 PullMessageService#pullMessage拉取消息
- 3 DefaultMQPushConsumerImpl#pullMessage拉取消息
- 4 总结
1 PullMessageService拉取消息
Push模式下,消息拉取由PullMessageService服务实现,PullMessageService继承了ServiceThread,因此他也是一个异步线程任务。
我们来看看它的run方法,该方法在一个循环中,不断地从pullRequestQueue中阻塞式的获取并移除队列的头部数据,即拉取消息的请求,然后调用pullMessage方法根据该请求去broker拉取消息。
也就是说只要pullRequestQueue队列中有拉取请求,它就会去Broker拉取消息,如果没有就阻塞。
@Override
public void run()
log.info(this.getServiceName() + " service started");
/*
* 运行时逻辑
* 如果服务没有停止,则在死循环中执行拉取消息的操作
*/
while (!this.isStopped())
try
//阻塞式的获取并移除队列的头部数据,即拉取消息的请求
PullRequest pullRequest = this.pullRequestQueue.take();
//根据该请求去broker拉取消息
this.pullMessage(pullRequest);
catch (InterruptedException ignored)
catch (Exception e)
log.error("Pull Message Service Run Method exception", e);
log.info(this.getServiceName() + " service end");
2 PullMessageService#pullMessage拉取消息
pullMessage方法是拉取消息的入口方法。内部实际调用DefaultMQPushConsumerImpl的pullMessage方法。
/**
* PullMessageService的方法
*
* @param pullRequest
*/
private void pullMessage(final PullRequest pullRequest)
//从consumerTable中获取pullRequest中保存的消费者组的消费者实例
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null)
//强制转型
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//拉取消息
impl.pullMessage(pullRequest);
else
log.warn("No matched consumer for the PullRequest , drop it", pullRequest);
3 DefaultMQPushConsumerImpl#pullMessage拉取消息
下面是DefaultMQPushConsumerImpl的pullMessage方法的源码,代码很长内容很多,但是我们只需要关心几个重点步骤和方法:
- 服务状态校验。如果消费者服务状态异常,或者消费者暂停了,那么延迟发送拉取消息请求。
- 流控校验。默认请款下,如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000,或者processQueue中已缓存的消息总大小大于设定的阈值,默认100MB那么同样延迟发送拉取消息请求。
- 顺序消费和并发消费的校验。如果是并发消费并且内存中消息的offset的最大跨度大于设定的阈值,默认2000。那么延迟发送拉取消息请求。如果是顺序消费并且没有锁定过,那么需要设置消费点位。
- 创建拉取消息的回调函数对象PullCallback,当拉取消息的请求返回之后,将会调用回调函数。这里面的源码我们后面再讲。
- 判断是否允许上报消费点位,如果是集群消费模式,并且本地内存有关于此mq的offset,那么设置commitOffsetEnable为true,表示拉取消息时可以上报消费位点给Broker进行持久化。
- 调用pullAPIWrapper.pullKernelImpl方法真正的拉取消息。
/**
* DefaultMQPushConsumerImpl的方法
* 拉取消息
*
* @param pullRequest 拉取消息请求
*/
public void pullMessage(final PullRequest pullRequest)
//获取对应的处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//如果该处理队列已被丢去,那么直接返回
if (processQueue.isDropped())
log.info("the pull request[] is dropped.", pullRequest.toString());
return;
//设置最后的拉取时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
/*
* 1 状态校验
*/
try
//确定此consumer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
this.makeSureStateOK();
catch (MQClientException e)
log.warn("pullMessage exception, consumer state not ok", e);
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
//如果消费者暂停了,那么延迟1s发送拉取消息请求
if (this.isPause())
log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
/*
* 2 流控校验
*/
//获取processQueue中已缓存的消息总数量
long cachedMessageCount = processQueue.getMsgCount().get();
//获取processQueue中已缓存的消息总大小MB
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
//延迟50ms发送拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
//如果processQueue中已缓存的消息总大小大于设定的阈值,默认100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue())
//延迟50ms发送拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message size exceeds the threshold MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
/*
* 3 顺序消费和并发消费的校验
*/
//如果不是顺序消息,即并发消费
if (!this.consumeOrderly)
//如果内存中消息的offset的最大跨度大于设定的阈值,默认2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan())
//延迟50ms发送拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0)
log.warn(
"the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
return;
else
//顺序消费校验,如果已锁定
if (processQueue.isLocked())
//如果此前没有锁定过,那么需要设置消费点位
if (!pullRequest.isPreviouslyLocked())
long offset = -1L;
try
//获取该MessageQueue的下一个消息的消费偏移量offset
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
catch (Exception e)
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: ", pullRequest, e);
return;
//消费点位超前,那么重设消费点位
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: NewOffset: brokerBusy: ",
pullRequest, offset, brokerBusy);
if (brokerBusy)
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: NewOffset: ",
pullRequest, offset);
//设置previouslyLocked为true
pullRequest.setPreviouslyLocked(true);
//重设消费点位
pullRequest.setNextOffset(offset);
else
//如果没有被锁住,那么延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, ", pullRequest);
return;
//获取topic对应的SubscriptionData订阅关系
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
//如果没有订阅信息
if (null == subscriptionData)
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, ", pullRequest);
return;
//起始时间
final long beginTimestamp = System.currentTimeMillis();
/*
* 4 创建拉取消息的回调函数对象,当拉取消息的请求返回之后,将会指定回调函数
*/
PullCallback pullCallback = new PullCallback()
@Override
public void onSuccess(PullResult pullResult)
if (pullResult != null)
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus())
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, ",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);
, 10000);
break;
default:
break;
@Override
public void onException(Th以上是关于RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码