RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer处理Broker的拉取消息响应源码。

此前我们学习了Consumer如何发起的拉取消息请求,以及Broker如何处理拉取消息请求,现在我们来学习Consumer如何处理Broker的拉取消息响应的源码。

文章目录

1 客户端异步请求回调

此前我们讲过在consumer发起拉取消息请求的时候,通过ASYNC模式异步的进行拉取,并且InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法。

在回调方法中,如果解析到了响应结果,那么调用pullCallback#onSuccess方法处理,否则调用pullCallback#onException方法处理。

/**
 * MQClientAPIImpl的方法
 * 异步的拉取消息,并且触发回调函数
 *
 * @param addr          broker地址
 * @param request       请求命令对象
 * @param timeoutMillis 消费者消息拉取超时时间,默认30s
 * @param pullCallback  拉取到消息之后调用的回调函数
 * @throws RemotingException
 * @throws InterruptedException
 */
private void pullMessageAsync(
        final String addr,
        final RemotingCommand request,
        final long timeoutMillis,
        final PullCallback pullCallback
) throws RemotingException, InterruptedException 
    /*
     * 基于netty给broker发送异步消息,设置一个InvokeCallback回调对象
     *
     * InvokeCallback#operationComplete方法将会在得到结果之后进行回调,内部调用pullCallback的回调方法
     */
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() 
        /**
         * 异步执行的回调方法
         */
        @Override
        public void operationComplete(ResponseFuture responseFuture) 
            //返回命令对象
            RemotingCommand response = responseFuture.getResponseCommand();
            if (response != null) 
                try 
                    //解析响应获取结果
                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                    assert pullResult != null;
                    //如果解析到了结果,那么调用pullCallback#onSuccess方法处理
                    pullCallback.onSuccess(pullResult);
                 catch (Exception e) 
                    //出现异常则调用pullCallback#onException方法处理异常
                    pullCallback.onException(e);
                
             else 
                //没有结果,都调用onException方法处理异常
                if (!responseFuture.isSendRequestOK()) 
                    //发送失败
                    pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                 else if (responseFuture.isTimeout()) 
                    //超时
                    pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                 else 
                    pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                
            
        
    );

1.1.1.1. processPullResponse解析响应

该方法处理response获取PullResult,根据响应的数据创建PullResultExt对象返回,注意此时拉取到的消息还是一个字节数组。

/**
 * MQClientAPIImpl的方法
 * 处理response获取PullResult
 */
private PullResult processPullResponse(
        final RemotingCommand response,
        final String addr) throws MQBrokerException, RemotingCommandException 
    PullStatus pullStatus = PullStatus.NO_NEW_MSG;
    //设置结果状态码
    switch (response.getCode()) 
        case ResponseCode.SUCCESS:
            pullStatus = PullStatus.FOUND;
            break;
        case ResponseCode.PULL_NOT_FOUND:
            pullStatus = PullStatus.NO_NEW_MSG;
            break;
        case ResponseCode.PULL_RETRY_IMMEDIATELY:
            pullStatus = PullStatus.NO_MATCHED_MSG;
            break;
        case ResponseCode.PULL_OFFSET_MOVED:
            pullStatus = PullStatus.OFFSET_ILLEGAL;
            break;

        default:
            throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
    
    //解析响应头
    PullMessageResponseHeader responseHeader =
            (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
    //根据响应的数据创建PullResultExt对象返回,此时拉取到的消息还是一个字节数组
    return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
            responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());

2 PullCallback回调

在processPullResponse处理response之后,会调用此前DefaultMQPushConsumerImpl#pullMessage方法中创建的PullCallback消息拉取的回调函数,执行onSuccess回调方法。如果解析过程中抛出异常,则调用onException方法。

onSuccess回调方法的大概逻辑为:

  1. 调用processPullResult方法处理pullResult,进行消息解码、过滤以及设置其他属性的操作,返回pullResult。
  2. 如果没有拉取到消息,那么设置下一次拉取的起始offset到PullRequest中,调用executePullRequestImmediately方法立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务,PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求,进行下次消息的拉取
  3. 如果拉取到了消息,将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中,等待被异步的消费。
  4. 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息,consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现。
  5. 获取配置的消息拉取间隔,默认为0,如果大于0则调用executePullRequestLater方法,等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即调用executePullRequestImmediately放入pullRequestQueue中,进行下次消息的拉取。

如果是onException方法,那么延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取。

PullCallback pullCallback = new PullCallback() 
    @Override
    public void onSuccess(PullResult pullResult) 
        if (pullResult != null) 
            /*
             * 1 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
             */
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);

            switch (pullResult.getPullStatus()) 
                case FOUND:
                    //拉取的起始offset
                    long prevRequestOffset = pullRequest.getNextOffset();
                    //设置下一次拉取的起始offset到PullRequest中
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    //增加拉取耗时
                    long pullRT = System.currentTimeMillis() - beginTimestamp;
                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);

                    long firstMsgOffset = Long.MAX_VALUE;
                    //如果没有消息
                    if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) 
                        /*
                         * 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,PullMessageService是一个线程服务
                         * PullMessageService将会循环的获取pullRequestQueue中的pullRequest然后向broker发起新的拉取消息请求
                         * 进行下次消息的拉取
                         */
                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                     else 
                        //获取第一个消息的offset
                        firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                        //增加拉取tps
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                        /*
                         * 2 将拉取到的所有消息,存入对应的processQueue处理队列内部的msgTreeMap中
                         */
                        boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                        /*
                         * 3 通过consumeMessageService将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息
                         * consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现
                         */
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);
                        /*
                         * 4 获取配置的消息拉取间隔,默认为0,则等待间隔时间后将拉取请求再次放入pullRequestQueue中,否则立即放入pullRequestQueue中
                         * 进行下次消息的拉取
                         */
                        if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) 
                            /*
                             * 将executePullRequestImmediately的执行放入一个PullMessageService的scheduledExecutorService延迟任务线程池中
                             * 等待给定的延迟时间到了之后再执行executePullRequestImmediately方法
                             */
                            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                         else 
                            /*
                             * 立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
                             */
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        
                    

                    if (pullResult.getNextBeginOffset() < prevRequestOffset
                            || firstMsgOffset < prevRequestOffset) 
                        log.warn(
                                "[BUG] pull message result maybe data wrong, nextBeginOffset:  firstMsgOffset:  prevRequestOffset: ",
                                pullResult.getNextBeginOffset(),
                                firstMsgOffset,
                                prevRequestOffset);
                    

                    break;
                case NO_NEW_MSG:
                    //没有匹配到消息
                case NO_MATCHED_MSG:
                    //更新下一次拉取偏移量
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                    DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                    //立即将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                    break;
                //请求offset不合法,过大或者过小
                case OFFSET_ILLEGAL:
                    log.warn("the pull request offset illegal,  ",
                            pullRequest.toString(), pullResult.toString());
                    //更新下一次拉取偏移量
                    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                    //丢弃拉取请求
                    pullRequest.getProcessQueue().setDropped(true);
                    DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() 

                        @Override
                        public void run() 
                            try 
                                //更新下次拉取偏移量
                                DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                        pullRequest.getNextOffset(), false);
                                //持久化offset
                                DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                //移除对应的消费队列,同时将消息队列从负载均衡服务中移除
                                DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                log.warn("fix the pull request offset, ", pullRequest);
                             catch (Throwable e) 
                                log.error("executeTaskLater Exception", e);
                            
                        
                    , 10000);
                    break;
                default:
                    break;
            
        
    

    @Override
    public void onException(Throwable e) 
        if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
            log.warn("execute the pull request exception", e);
        
        /*
         * 出现异常,延迟3s将拉取请求再次放入PullMessageService的pullRequestQueue中,等待下次拉取
         */
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    
;

2.1 processPullResult处理拉取结果

处理pullResult,进行消息解码、过滤以及设置其他属性的操作。

  1. 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出。
  2. 对消息二进制字节数组进行解码转换为java的List消息集合。
  3. 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤。这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode。
  4. 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑。
  5. 遍历过滤通过的消息,设置属性。例如事务id,最大、最小偏移量、brokerName。
  6. 将过滤后的消息存入msgFoundList集合
  7. 因为消息已经被解析了,那么设置消息的字节数组为null,释放内存。
/**
 * PullAPIWrapper的方法
 * 处理pullResult,进行消息解码、过滤以及设置其他属性的操作
 *
 * @param mq               消息队列
 * @param pullResult       拉取结果
 * @param subscriptionData 获取topic对应的SubscriptionData订阅关系
 * @return 处理后的PullResult
 */
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
                                    final SubscriptionData subscriptionData) 
    PullResultExt pullResultExt = (PullResultExt) pullResult;
    /*
     * 1 更新下次拉取建议的brokerId,下次拉取消息时从pullFromWhichNodeTable中直接取出
     */
    this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

    if (PullStatus.FOUND == pullResult.getPullStatus()) 
        /*
         * 2 对二进制字节数组进行解码转换为java的List<MessageExt>消息集合
         */
        ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

        List<MessageExt> msgListFilterAgain = msgList;
        /*
         * 3 如果存在tag,并且不是classFilterMode,那么按照tag过滤消息,这就是客户端的消息过滤
         */
        if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) 
            msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
            for (MessageExt msg : msgList) 
                if (msg.getTags() != null) 
                    //这采用String#equals方法过滤,而broker端则是比较的tagHash值,即hashCode
                    if (subscriptionData.getTagsSet().contains(msg.getTags())) 
                        msgListFilterAgain.add(msg);
                    
                
            
        
        /*
         * 4 如果有消息过滤钩子,那么执行钩子方法,这里可以扩展自定义的消息过滤的逻辑
         */
        if (this.hasHook()) 
            FilterMessageContext filterMessageContext = new FilterMessageContext();
            filterMessageContext.setUnitMode(unitMode);
            filterMessageContext.setMsgList(msgListFilterAgain);
            this.executeHook(filterMessageContext);
        
        /*
         * 5 遍历过滤通过的消息,设置属性
         */
        for (MessageExt msg : msgListFilterAgain) 
            //事务消息标识
            String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            //如果是事务消息,则设置事务id
            if (Boolean.parseBoolean(traFlag)) 
                msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
            
            //将响应中的最小和最大偏移量存入msg
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            //设置brokerName到msg
            msg.setBrokerName(mq.getBrokerName());
        
        //将过滤后的消息存入msgFoundList集合
        pullResultExt.setMsgFoundList(msgListFilterAgainRocketMQ—消费者客户端详解

RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

Alibaba RocketMQ详析~

RocketMQ源码—RocketMQ源码调试环境准备

源码分析RocketMQ系列索引