RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer消费者发起拉取消息请求源码。

此前我们学习了DefaultMQPushConsumer负载均衡的源码,同时我们也知道了,最初始的PullRequest,就是在负载均衡之时对于新分配到的消费队列创建的。然后通过dispatchPullRequest方法对这些PullRequest进行分发,Push模式下这些请求会被PullMessageService依次处理,后续实现自动拉取消息,以及消费。

这些PullRequest将会被存入PullMessageService服务内部的pullRequestQueue集合中,后续异步的消费,自动执行拉取消息的请求,这就是Push模式下最初的拉消息请求的来源。关于如何拉去消息以及如何消费,将是我们下一部分的内容。

/**
 * 分发处理消息
 * @param pullRequestList
 */
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) 
    //遍历拉去请求
    for (PullRequest pullRequest : pullRequestList) 
        //将请求存入PullMessageService服务的pullRequestQueue集合中,后续异步的消费,执行拉取消息的请求
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, , add a new pull request ", consumerGroup, pullRequest);
    

下面我们来介绍DefaultMQPushConsumer消费者如何拉取消息的源码,主要步骤可以分为三部分:

  1. consumer发送拉取消息请求。
  2. broker处理拉取消息请求。
  3. consumer接收请求响应。

本次我们学习consumer如何发送拉取消息请求。

文章目录

1 PullMessageService拉取消息

Push模式下,消息拉取由PullMessageService服务实现,PullMessageService继承了ServiceThread,因此他也是一个异步线程任务。

我们来看看它的run方法,该方法在一个循环中,不断地从pullRequestQueue中阻塞式的获取并移除队列的头部数据,即拉取消息的请求,然后调用pullMessage方法根据该请求去broker拉取消息。

也就是说只要pullRequestQueue队列中有拉取请求,它就会去Broker拉取消息,如果没有就阻塞。

@Override
public void run() 
    log.info(this.getServiceName() + " service started");
    /*
     * 运行时逻辑
     * 如果服务没有停止,则在死循环中执行拉取消息的操作
     */
    while (!this.isStopped()) 
        try 
            //阻塞式的获取并移除队列的头部数据,即拉取消息的请求
            PullRequest pullRequest = this.pullRequestQueue.take();
            //根据该请求去broker拉取消息
            this.pullMessage(pullRequest);
         catch (InterruptedException ignored) 
         catch (Exception e) 
            log.error("Pull Message Service Run Method exception", e);
        
    

    log.info(this.getServiceName() + " service end");

2 PullMessageService#pullMessage拉取消息

pullMessage方法是拉取消息的入口方法。内部实际调用DefaultMQPushConsumerImpl的pullMessage方法。

/**
 * PullMessageService的方法
 *
 * @param pullRequest
 */
private void pullMessage(final PullRequest pullRequest) 
    //从consumerTable中获取pullRequest中保存的消费者组的消费者实例
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) 
        //强制转型
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        //拉取消息
        impl.pullMessage(pullRequest);
     else 
        log.warn("No matched consumer for the PullRequest , drop it", pullRequest);
    

3 DefaultMQPushConsumerImpl#pullMessage拉取消息

下面是DefaultMQPushConsumerImpl的pullMessage方法的源码,代码很长内容很多,但是我们只需要关心几个重点步骤和方法:

  1. 服务状态校验。如果消费者服务状态异常,或者消费者暂停了,那么延迟发送拉取消息请求。
  2. 流控校验默认请款下,如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000,或者processQueue中已缓存的消息总大小大于设定的阈值,默认100MB那么同样延迟发送拉取消息请求。
  3. 顺序消费和并发消费的校验如果是并发消费并且内存中消息的offset的最大跨度大于设定的阈值,默认2000。那么延迟发送拉取消息请求。如果是顺序消费并且没有锁定过,那么需要设置消费点位。
  4. 创建拉取消息的回调函数对象PullCallback,当拉取消息的请求返回之后,将会调用回调函数。这里面的源码我们后面再讲。
  5. 判断是否允许上报消费点位,如果是集群消费模式,并且本地内存有关于此mq的offset,那么设置commitOffsetEnable为true,表示拉取消息时可以上报消费位点给Broker进行持久化。
  6. 调用pullAPIWrapper.pullKernelImpl方法真正的拉取消息。
/**
 * DefaultMQPushConsumerImpl的方法
 * 拉取消息
 *
 * @param pullRequest 拉取消息请求
 */
public void pullMessage(final PullRequest pullRequest) 
    //获取对应的处理队列
    final ProcessQueue processQueue = pullRequest.getProcessQueue();
    //如果该处理队列已被丢去,那么直接返回
    if (processQueue.isDropped()) 
        log.info("the pull request[] is dropped.", pullRequest.toString());
        return;
    
    //设置最后的拉取时间戳
    pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
    /*
     * 1 状态校验
     */
    try 
        //确定此consumer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
        this.makeSureStateOK();
     catch (MQClientException e) 
        log.warn("pullMessage exception, consumer state not ok", e);
        //延迟3s发送拉取消息请求
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        return;
    
    //如果消费者暂停了,那么延迟1s发送拉取消息请求
    if (this.isPause()) 
        log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
        return;
    
    /*
     * 2 流控校验
     */
    //获取processQueue中已缓存的消息总数量
    long cachedMessageCount = processQueue.getMsgCount().get();
    //获取processQueue中已缓存的消息总大小MB
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    //如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) 
        //延迟50ms发送拉取消息请求
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) 
            log.warn(
                    "the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        
        return;
    
    //如果processQueue中已缓存的消息总大小大于设定的阈值,默认100MB
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) 
        //延迟50ms发送拉取消息请求
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueFlowControlTimes++ % 1000) == 0) 
            log.warn(
                    "the cached message size exceeds the threshold  MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
        
        return;
    
    /*
     * 3 顺序消费和并发消费的校验
     */
    //如果不是顺序消息,即并发消费
    if (!this.consumeOrderly) 
        //如果内存中消息的offset的最大跨度大于设定的阈值,默认2000
        if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) 
            //延迟50ms发送拉取消息请求
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) 
                log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
            
            return;
        
     else 
        //顺序消费校验,如果已锁定
        if (processQueue.isLocked()) 
            //如果此前没有锁定过,那么需要设置消费点位
            if (!pullRequest.isPreviouslyLocked()) 
                long offset = -1L;
                try 
                    //获取该MessageQueue的下一个消息的消费偏移量offset
                    offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                 catch (Exception e) 
                    //延迟3s发送拉取消息请求
                    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                    log.error("Failed to compute pull offset, pullResult: ", pullRequest, e);
                    return;
                
                //消费点位超前,那么重设消费点位
                boolean brokerBusy = offset < pullRequest.getNextOffset();
                log.info("the first time to pull message, so fix offset from broker. pullRequest:  NewOffset:  brokerBusy: ",
                        pullRequest, offset, brokerBusy);
                if (brokerBusy) 
                    log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest:  NewOffset: ",
                            pullRequest, offset);
                
                //设置previouslyLocked为true
                pullRequest.setPreviouslyLocked(true);
                //重设消费点位
                pullRequest.setNextOffset(offset);
            
         else 
            //如果没有被锁住,那么延迟3s发送拉取消息请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.info("pull message later because not locked in broker, ", pullRequest);
            return;
        
    
    //获取topic对应的SubscriptionData订阅关系
    final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
    //如果没有订阅信息
    if (null == subscriptionData) 
        //延迟3s发送拉取消息请求
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        log.warn("find the consumer's subscription failed, ", pullRequest);
        return;
    
    //起始时间
    final long beginTimestamp = System.currentTimeMillis();
    /*
     * 4 创建拉取消息的回调函数对象,当拉取消息的请求返回之后,将会指定回调函数
     */
    PullCallback pullCallback = new PullCallback() 
        @Override
        public void onSuccess(PullResult pullResult) 
            if (pullResult != null) 
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                switch (pullResult.getPullStatus()) 
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) 
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                         else 
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) 
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                             else 
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            
                        

                        if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) 
                            log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset:  firstMsgOffset:  prevRequestOffset: ",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                        

                        break;
                    case NO_NEW_MSG:
                    case NO_MATCHED_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                    case OFFSET_ILLEGAL:
                        log.warn("the pull request offset illegal,  ",
                                pullRequest.toString(), pullResult.toString());
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        pullRequest.getProcessQueue().setDropped(true);
                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() 

                            @Override
                            public void run() 
                                try 
                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                    log.warn("fix the pull request offset, ", pullRequest);
                                 catch (Throwable e) 
                                    log.error("executeTaskLater Exception", e);
                                
                            
                        , 10000);
                        break;
                    default:
                        break;
                
            
        

        @Override
        public void onException(Th

以上是关于RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ—消费者客户端详解

RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码

RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码

RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码

RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码

RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字