RocketMq消费者拉取消息源码

Posted 壹佰大多

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMq消费者拉取消息源码相关的知识,希望对你有一定的参考价值。

文章目录

1、DefaultMQPushConsumer分发处理消息源码

/**
 * 分发处理消息
 * @param pullRequestList
 */
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) 
    //遍历拉去请求
    for (PullRequest pullRequest : pullRequestList) 
        //将请求存入PullMessageService服务的pullRequestQueue集合中,后续异步的消费,执行拉取消息的请求
        this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
        log.info("doRebalance, , add a new pull request ", consumerGroup, pullRequest);
    

2、pullMessageServer

Push模式下, 消息拉取由PullMessageService服务实现,PullMessageService继承了ServiceThread,因此他也是一个异步线程任务。

在它的run方法中,在循环中不断的从pullRequestQueue中阻塞式的获取并移除队列的头部数据,即拉取消息的请求,然后调用pullMessage方法根据该请求去broker拉取消息

 /**
     * PullMessageService的方法
     * 从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
     */
    @Override
    public void run() 
        logger.info(this.getServiceName() + " service started");
 
        //循环拉取消息
        //运行时逻辑。如果服务没有停止,则在死循环中执行拉取消息的操作。
        while (!this.isStopped()) 
            try 
                /**
                 * TODO: 从请求队列中获取拉取消息请求,刚开始肯定获取不到(因为没有经过重平衡分配数据),那么我们就要看是什么时候将拉取请求放入队列中的呢?
                 * 没错,这就要看重平衡服务了,这里最开始是阻塞的,经过重平衡可以获取到PullRequest
                 */
                MessageRequest messageRequest = this.messageRequestQueue.take();
                /**
                 * 如果消息请求模式是POP
                 * POP:在pop模式下工作的消费者可以共享MessageQueue
                 */
                if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) 
                    //执行popMessage
                    this.popMessage((PopRequest)messageRequest);
                 else 
                    //TODO: 拉取消息
                    this.pullMessage((PullRequest)messageRequest);
                
             catch (InterruptedException ignored) 
             catch (Exception e) 
                logger.error("Pull Message Service Run Method exception", e);
            
        
 
        logger.info(this.getServiceName() + " service end");
    

3、pullMessageServer的pullMessage源码分析

pullMessage方法是拉取消息的入口方法。内部实际调用DefaultMQPushConsumerImpl的pullMessage方法。

  /**
     * PullMessageService的方法
     * 拉取消息服务
     * @param pullRequest       拉取请求
     */
    private void pullMessage(final PullRequest pullRequest) 
        //从consumerTable中获取pullRequest中保存的消费者组的取消费者实例
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) 
            //强转为推送模式消费者
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            //拉取消息
            impl.pullMessage(pullRequest);
         else 
            logger.warn("No matched consumer for the PullRequest , drop it", pullRequest);
        
    

4、DefaultMQPushConsumerImpl的拉取消息

  1. 服务状态校验。如果消费者服务状态异常,或者消费者暂停了,那么延迟发送拉取消息请求。

  2. 流控校验。默认情况下,如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000,或者processQueue中已缓存的消息总大小大于设定的阈值,默认100MB那么同样延迟发送拉取消息请求。

  3. 顺序消费和并发消费的校验。如果是并发消费并且内存中消息的offset的最大跨度大于设定的阈值,默认2000。那么延迟发送拉取消息请求。如果是顺序消费并且没有锁定过,那么需要设置消费点位。

  4. 创建拉取消息的回调函数对象PullCallback,当拉取消息的请求返回之后,将会调用回调函数。这里面的源码我们后面再讲。

  5. 判断是否允许上报消费点位,如果是集群消费模式,并且本地内存有关于此mq的offset,那么设置commitOffsetEnable为true,表示拉取消息时可以上报消费位点给Broker进行持久化。

  6. 调用pullAPIWrapper.pullKernelImpl方法真正的拉取消息。

   /**
     * DefaultMQPushConsumerImpl的方法
     * 拉取消息
     * @param pullRequest       拉取消息请求
     */
    public void pullMessage(final PullRequest pullRequest) 
        //获取对应的处理队列
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        //如果该处理队列被丢弃,直接返回(默认是false)
        if (processQueue.isDropped()) 
            log.info("the pull request[] is dropped.", pullRequest.toString());
            return;
        
 
        //如果处理队列未被丢弃,更新时间戳
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
 
        /**
         * 1. 状态校验
         */
        try 
            //确保此consumer的服务状态正常,服务状态为RUNNING才为正常,如果服务状态不是RUNNING,那么抛出异常
            this.makeSureStateOK();
         catch (MQClientException e) 
            log.warn("pullMessage exception, consumer state not ok", e);
            //延迟3s发送拉取消息请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            return;
        
 
        //如果消费者暂停了,那么延迟1s后再发送拉取消息请求(默认是false)
        if (this.isPause()) 
            log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        
 
        /**
         * 2. 流控校验
         */
        //获得processQueue中已缓存的消息总数量
        long cachedMessageCount = processQueue.getMsgCount().get();
        //获取processQueue中已缓存的消息总大小MB
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
 
        //TODO: 判断是否出发流控?目的:流控主要是保护消费者。当消费者消费能力不够时,拉取速度太快会导致大量消息积压,很可能导致内存溢出。
        /**
         * TODO:如果消息数量大于配置,则从消息数量进行流控
         * 1. 判断queue缓存的消息数量是否超过1000(可以根据pullThresholdForQueue参数配置)
         *    如果超过了1000,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列(this.pullRequestQueue.put(pullRequest)),然后重新拉取
         */
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) 
            //延迟执行拉取消息请求
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) 
                //缓存的消息计数超过了阈值{},因此流控制也超过了阈值
                log.warn(
                    "the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            
            return;
        
 
        /**
         * TODO: 如果消息大小超过配置,则从消息大小进行流控
         * 2. 判断queue缓存的消息大小是否超过100M(可以根据pullThresholdSizeForQueue参数配置)
         *    如果超过100M,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列中(this.pullRequestQueue.put(pullRequest)),然后重新拉取
         */
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) 
            //延迟执行拉取消息请求
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) 
                //缓存的消息大小超过了阈值{}MiB,流控制也是如此
                log.warn(
                    "the cached message size exceeds the threshold  MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            
            return;
        
 
        /**
         * 3 顺序消费和并发消费的校验
         */
        //如果不是顺序消费,即并发消费
        if (!this.consumeOrderly) 
            //如果内存中消息的offset的最大跨度大于设置的阈值,默认2000
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) 
                //延迟50ms发送拉取消息请求
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) 
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                
                return;
            
         else 
            //顺序消费校验,如果已经锁定
            if (processQueue.isLocked()) 
                //如果此前没有锁定过,那么需要设置消费点位(默认false)
                if (!pullRequest.isPreviouslyLocked()) 
                    long offset = -1L;
                    try 
                        //获取该MessageQueue的下一个消息的消费偏移量offset
                        offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
                        if (offset < 0) 
                            throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
                        
                     catch (Exception e) 
                        //延迟3s发送拉取消息请求
                        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                        log.error("Failed to compute pull offset, pullResult: ", pullRequest, e);
                        return;
                    
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    //第一次拉取消息时,请修复来自代理的偏移量。pull请求:
                    log.info("the first time to pull message, so fix offset from broker. pullRequest:  NewOffset:  brokerBusy: ",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) 
                        //第一次拉取消息,但拉取请求偏移量大于代理消耗偏移量
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest:  NewOffset: ",
                            pullRequest, offset);
                    
 
                    //设置previouslyLocked为true
                    pullRequest.setPreviouslyLocked(true);
                    //重设消费点位
                    pullRequest.setNextOffset(offset);
                
             else 
                //如果没有被锁住,那么延迟3s发送拉取消息请求
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                log.info("pull message later because not locked in broker, ", pullRequest);
                return;
            
        
 
        //获取topic对应的SubscriptionData订阅关系
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        //如果订阅信息为null,则表示发现消费者的订阅失败
        if (null == subscriptionData) 
            //延迟3s发送拉取消息请求
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            log.warn("find the consumer's subscription failed, ", pullRequest);
            return;
        
 
        //起始时间
        final long beginTimestamp = System.currentTimeMillis();
 
        /**
         * 4. 创建拉取消息的回调函数对象,当拉取消息的请求返回之后,将会指定回调函数
         */
        //TODO: 构建消息处理的回调对象,它的非常重要的,等从broker拉取消息后(这里是从broker拉取消息成功后才执行的),会交给它来处理
        PullCallback pullCallback = new PullCallback() 
            @Override
            public void onSuccess(PullResult pullResult) 
                if (pullResult != null) 
                    /**
                     * 拉取成功,开始处理从broker读取到的消息
                     * 将二进制内容转换成MessageExt对象,并根据TAG的值进行过滤
                     */
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
 
                    switch (pullResult.getPullStatus()) 
                        //TODO: 发现了消息
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            //TODO: 更新nextOffset的值
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
 
                            long firstMsgOffset = Long.MAX_VALUE;
                            /**
                             * TODO:如果没有消息则立即执行,立即拉取的意思是继续将PullRequest放入队列中
                             * 这样take()方法将不会再阻塞,然后继续从broker拉取消息,从而达到持续从broker拉取消息
                             */
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) 
                                //拉取消息
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                             else 
                                //拉取的消息不为空
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
 
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
 
                                //TODO: 将本次读取到的所有信息(经过TAG/SQL过滤)保存到本地缓存队列processQueue中
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                /**
                                 * TODO: 构建consumeRequest,将消息提交到线程池中,由ConsumerMessageService 进行消费
                                 * 由于我们的是普通消息(不是顺序消息),所以由ConsumeMessageConcurrentlyService类来消费消息
                                 * ConsumeMessageConcurrentlyService内部会创建一个线程池ThreadPoolExecutor,这个xcc非常重要,消息最终将提交到这个线程池中
                                 */
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
 
                                /**
                                 * 上面是异步消费,然后这里是将PullRequest放入队列中,这样take()方法将不会阻塞
                                 * 然后继续从broker中拉取消息,从而到达持续从broker中拉取消息
                                 * 延迟pullInterval 时间再去拉取消息:
                                 *      这里有一个 pullInterval参数,表示间隔多长时间在放入队列中(实际上就是间隔多长时间再去broker拉取消息)。当消费者消费速度比生产者快的时候,可以考虑设置这个值,这样可以避免大概率拉取到空消息。
                                 *
                                 */
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) 
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                 else 
                                    //立即拉取消息
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                
                            
 
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) 
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset:  firstMsgOffset:  prevRequestOffset: ",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            
 
                            break;
                        case NO_NEW_MSG:
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
 
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal,  ",
                                pullRequest.toString(), pullResult.toString(

RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字

基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。

此前我们学习了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码。我们知道consumer在发送了拉取消息请求的时候,请求的Code为PULL_MESSAGE,broker端接收到请求之后,将会更根据不同的Code调用不同的处理器进行处理,而PULL_MESSAGE拉取消息的请求则是通过PullMessageProcessor处理的。

下面我们来看看Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。

1 PullMessageProcessor处理拉取请求

PullMessageProcessor作为broker的拉取消息处理器,用于处理拉取消息的请求,他在BrokerController实例化的时候跟着实例化。

然后在registerProcessor方法中,将Code为PULL_MESSAGE的请求与其绑定。


随后在processRequestCommand方法中,会根据请求的Code选择不同的netty处理器进行处理,调用方法为asyncProcessRequest:


在asyncProcessRequest方法中,将会调用processRequest方法,该方法由各个处理器实现,这里就是PullMessageProcessor处理器处理器PULL_MESSAGE拉取消息请求的入口方法。

/**
 * PullMessageProcessor的方法
 * <p>
 * 处理PULL_MESSAGE拉取消息请求
 */
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException 
    //调用另一个processRequest方法,第三个参数broker是否支持持挂起请求为true
    return this.processRequest(ctx.channel(), request, true);

2 processRequest处理拉取消息请求

**该方法处理拉取消息的请求,包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤,源码非常多,不易理解。**大概步骤为(详细步骤请看源码注释):

  1. 构建过滤信息。基于TAG会获取subscriptionData,基于classFilter还会获取consumerFilterData,最后根据它们构建MessageFilter对象。
    1. 如果有子订阅标记,即hasSubscriptionFlag = true,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的数据。
    2. 一般hasSubscriptionFlag都是false,因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
  2. 通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据。返回拉取结果GetMessageResult。
  3. 对于拉取结果GetMessageResult进行处理,设置响应数据。
    1. 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE。
      1. 如果getMessage方法返回的GetMessageResult的suggestPullingFromSlave属性为true,则设置responseHeader的suggestWhichBrokerId属性值为1,即建议下次从SLAVE拉取,否则设置为0,建议下次从MASTER拉取。
      2. 判断broker角色。如果是SLAVE,并且slaveReadEnable = false。那么设置responseHeader的suggestWhichBrokerId属性值为0,即建议下次从MASTER拉取。
      3. 如果slaveReadEnable = true,并且如果如果消费太慢了,那么下次重定向到另一台broker,id通过subscriptionGroupConfig的whichBrokerWhenConsumeSlowly指定,默认1,即SLAVE。否则id通过subscriptionGroupConfig的brokerId指定,默认0,即MASTER。如果slaveReadEnable = false,设置建议的brokerId为MASTER。
    2. 判断getMessageResult的状态码,并设置response的对应的响应码。
    3. 判断如果有消费钩子,那么执行消费钩子的consumeMessageBefore方法。
    4. 判断响应码,然后直接返回数据或者进行短轮询或者长轮询。
      1. 如果拉取消息成功,那么更新一些统计信息,然后从buffer中读取出消息转换为字节数组,存入response的body中。
      2. 如果没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达则会唤醒请求拉取消息并返回。
        1. 计算最长挂起时间,如果支持长轮询则默认最长挂起15s,否则使用短轮询,挂起最长1s。
        2. 构建一个PullRequest,通过pullRequestHoldService#suspendPullRequest方法提交PullRequest,该请求将会被挂起并异步处理。
          iii. 如果读取的offset不正确,太大或者太小,发布offset移除事件。
  4. 拉取消息完毕之后,无论是否拉取到消息,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false),并且consumer支持提交消费进度,并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。
/**
 * PullMessageProcessor的方法
 * <p>
 * 处理拉取消息请求
 *
 * @param channel            通连接道
 * @param request            请求
 * @param brokerAllowSuspend broker是否支持挂起请求
 * @return
 * @throws RemotingCommandException
 */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException 
    //起始时间
    final long beginTimeMills = this.brokerController.getMessageStore().now();
    //创建响应命令对象
    RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
    //创建响应头
    final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
    //解析请求头
    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    //设置请求id,通过id可以获取请求结果
    response.setOpaque(request.getOpaque());

    log.debug("receive PullMessage request command, ", request);
    //当前broker是否可读,不可读则直接返回
    if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
        return response;
    
    //获取当前consumerGroup对应的订阅信息
    SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    if (null == subscriptionGroupConfig) 
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
        return response;
    
    //判断是否可消费,不可消费则直接返回
    if (!subscriptionGroupConfig.isConsumeEnable()) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
        return response;
    
    //是否支持请求挂起
    final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
    //是否提交消费进度
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
    //是否存在子订阅,即TAG或者SQL92的设置,用于过滤消息
    final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
    //计算broker最长的挂起时间,默认15s,该参数是消费者传递的
    final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
    //获取topic配置
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) 
        log.error("the topic  not exist, consumer: ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
        return response;
    
    //topic是否可读,不可读则直接返回
    if (!PermName.isReadable(topicConfig.getPerm())) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
        return response;
    
    //校验请求中的队列id,如果小于0或者大于等于topic配置中的读队列数量,那么直接返回
    if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) 
        String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);
        return response;
    
    /*
     * 1 构建过滤信息
     * 真正的过滤消息操作还在后面,而且broker和consumer都会进行过滤
     */
    SubscriptionData subscriptionData = null;
    ConsumerFilterData consumerFilterData = null;
    //如果有子订阅标记,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的信息,一般都是false
    //因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
    if (hasSubscriptionFlag) 
        try 
            subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) 
                consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
                assert consumerFilterData != null;
            
         catch (Exception e) 
            log.warn("Parse the consumer's subscription[] failed, group: ", requestHeader.getSubscription(), requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
            response.setRemark("parse the consumer's subscription failed");
            return response;
        
     else 
        //获取消费者组信息
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
        if (null == consumerGroupInfo) 
            log.warn("the consumer's group info not exist, group: ", requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        
        //如果不支持广播消费但是消费者消费模式是广播消费,则直接返回
        if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) 
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
            return response;
        
        //获取broker缓存的此consumerGroupInfo中关于此topic的订阅关系
        subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
        if (null == subscriptionData) 
            log.warn("the consumer's subscription not exist, group: , topic:", requestHeader.getConsumerGroup(), requestHeader.getTopic());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        
        //比较订阅关系版本
        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) 
            log.warn("The broker's subscription is not latest, group:  ", requestHeader.getConsumerGroup(), subscriptionData.getSubString());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
            response.setRemark("the consumer's subscription not latest");
            return response;
        
        //如果订阅关系表达式不是TAG类型,那么构建consumerFilterData
        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) 
            consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
            if (consumerFilterData == null) 
                response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                return response;
            
            if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) 
                log.warn("The broker's consumer filter data is not latest, group: , topic: , serverV: , clientV: ", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                response.setRemark("the consumer's consumer filter data not latest");
                return response;
            
        
    
    //如果订阅关系表达式不是TAG类型,并且enablePropertyFilter没有开启支持SQL92,那么抛出异常
    //也就是说,如果消费者使用SQL92模式订阅,那么需要现在broker端设置enablePropertyFilter=true
    if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) 
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
        return response;
    

    MessageFilter messageFilter;
    //重试topic是否支持filter过滤,默认false,即重试topic是不支持过滤额
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) 
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
     else 
        //创建普通的ExpressionMessageFilter,内部保存了消费者启动时通过心跳上报的订阅关系
        //一般基于tag订阅的情况下,consumerFilterData是null,通过subscriptionData进行过滤
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
    

    /*
     * 2 通过DefaultMessageStore#getMessage方法批量拉取消息,并且进行过滤操作
     */
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    /*
     * 3 对于拉取结果GetMessageResult进行处理
     */
    if (getMessageResult != null) 
        //设置拉去状态枚举名字
        response.setRemark(getMessageResult.getStatus().name());
        //设置下次拉取的consumeQueue的起始逻辑偏移量
        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
        //设置consumeQueue的最小、最大的逻辑偏移量maxOffset和minOffset
        responseHeader.setMinOffset(getMessageResult.getMinOffset());
        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
        /*
         * 3.1 判断并设置下次拉取消息的建议broker是MATER还是SLAVE
         */
        //是否建议从SLAVE拉取消息
        if (getMessageResult.isSuggestPullingFromSlave()) 
            //设置建议的brokerId为从服务器的id 1
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
         else 
            //否则,设置建议的brokerId为主服务器的id 0
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        
        //判断broker角色
        switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) 
            case ASYNC_MASTER:
            case SYNC_MASTER:
                break;
            case SLAVE:
                //如果是SLAVE,并且从服务器不可读
                if (!this.brokerController.getBrokerConfig().以上是关于RocketMq消费者拉取消息源码的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码

RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码

RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字

RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字

源码分析RocketMQ消息消费机制----消费者拉取消息机制

RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

(c)2006-2024 SYSTEM All Rights Reserved IT常识