RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。

此前我们学习了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码。我们知道consumer在发送了拉取消息请求的时候,请求的Code为PULL_MESSAGE,broker端接收到请求之后,将会更根据不同的Code调用不同的处理器进行处理,而PULL_MESSAGE拉取消息的请求则是通过PullMessageProcessor处理的。

下面我们来看看Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。

1 PullMessageProcessor处理拉取请求

PullMessageProcessor作为broker的拉取消息处理器,用于处理拉取消息的请求,他在BrokerController实例化的时候跟着实例化。

然后在registerProcessor方法中,将Code为PULL_MESSAGE的请求与其绑定。


随后在processRequestCommand方法中,会根据请求的Code选择不同的netty处理器进行处理,调用方法为asyncProcessRequest:


在asyncProcessRequest方法中,将会调用processRequest方法,该方法由各个处理器实现,这里就是PullMessageProcessor处理器处理器PULL_MESSAGE拉取消息请求的入口方法。

/**
 * PullMessageProcessor的方法
 * <p>
 * 处理PULL_MESSAGE拉取消息请求
 */
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException 
    //调用另一个processRequest方法,第三个参数broker是否支持持挂起请求为true
    return this.processRequest(ctx.channel(), request, true);

2 processRequest处理拉取消息请求

**该方法处理拉取消息的请求,包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤,源码非常多,不易理解。**大概步骤为(详细步骤请看源码注释):

  1. 构建过滤信息。基于TAG会获取subscriptionData,基于classFilter还会获取consumerFilterData,最后根据它们构建MessageFilter对象。
    1. 如果有子订阅标记,即hasSubscriptionFlag = true,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的数据。
    2. 一般hasSubscriptionFlag都是false,因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
  2. 通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据。返回拉取结果GetMessageResult。
  3. 对于拉取结果GetMessageResult进行处理,设置响应数据。
    1. 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE。
      1. 如果getMessage方法返回的GetMessageResult的suggestPullingFromSlave属性为true,则设置responseHeader的suggestWhichBrokerId属性值为1,即建议下次从SLAVE拉取,否则设置为0,建议下次从MASTER拉取。
      2. 判断broker角色。如果是SLAVE,并且slaveReadEnable = false。那么设置responseHeader的suggestWhichBrokerId属性值为0,即建议下次从MASTER拉取。
      3. 如果slaveReadEnable = true,并且如果如果消费太慢了,那么下次重定向到另一台broker,id通过subscriptionGroupConfig的whichBrokerWhenConsumeSlowly指定,默认1,即SLAVE。否则id通过subscriptionGroupConfig的brokerId指定,默认0,即MASTER。如果slaveReadEnable = false,设置建议的brokerId为MASTER。
    2. 判断getMessageResult的状态码,并设置response的对应的响应码。
    3. 判断如果有消费钩子,那么执行消费钩子的consumeMessageBefore方法。
    4. 判断响应码,然后直接返回数据或者进行短轮询或者长轮询。
      1. 如果拉取消息成功,那么更新一些统计信息,然后从buffer中读取出消息转换为字节数组,存入response的body中。
      2. 如果没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达则会唤醒请求拉取消息并返回。
        1. 计算最长挂起时间,如果支持长轮询则默认最长挂起15s,否则使用短轮询,挂起最长1s。
        2. 构建一个PullRequest,通过pullRequestHoldService#suspendPullRequest方法提交PullRequest,该请求将会被挂起并异步处理。
          iii. 如果读取的offset不正确,太大或者太小,发布offset移除事件。
  4. 拉取消息完毕之后,无论是否拉取到消息,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false),并且consumer支持提交消费进度,并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。
/**
 * PullMessageProcessor的方法
 * <p>
 * 处理拉取消息请求
 *
 * @param channel            通连接道
 * @param request            请求
 * @param brokerAllowSuspend broker是否支持挂起请求
 * @return
 * @throws RemotingCommandException
 */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException 
    //起始时间
    final long beginTimeMills = this.brokerController.getMessageStore().now();
    //创建响应命令对象
    RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
    //创建响应头
    final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
    //解析请求头
    final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
    //设置请求id,通过id可以获取请求结果
    response.setOpaque(request.getOpaque());

    log.debug("receive PullMessage request command, ", request);
    //当前broker是否可读,不可读则直接返回
    if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
        return response;
    
    //获取当前consumerGroup对应的订阅信息
    SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    if (null == subscriptionGroupConfig) 
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
        return response;
    
    //判断是否可消费,不可消费则直接返回
    if (!subscriptionGroupConfig.isConsumeEnable()) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
        return response;
    
    //是否支持请求挂起
    final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
    //是否提交消费进度
    final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
    //是否存在子订阅,即TAG或者SQL92的设置,用于过滤消息
    final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
    //计算broker最长的挂起时间,默认15s,该参数是消费者传递的
    final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
    //获取topic配置
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    if (null == topicConfig) 
        log.error("the topic  not exist, consumer: ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
        return response;
    
    //topic是否可读,不可读则直接返回
    if (!PermName.isReadable(topicConfig.getPerm())) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
        return response;
    
    //校验请求中的队列id,如果小于0或者大于等于topic配置中的读队列数量,那么直接返回
    if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) 
        String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);
        return response;
    
    /*
     * 1 构建过滤信息
     * 真正的过滤消息操作还在后面,而且broker和consumer都会进行过滤
     */
    SubscriptionData subscriptionData = null;
    ConsumerFilterData consumerFilterData = null;
    //如果有子订阅标记,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的信息,一般都是false
    //因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
    if (hasSubscriptionFlag) 
        try 
            subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());
            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) 
                consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
                assert consumerFilterData != null;
            
         catch (Exception e) 
            log.warn("Parse the consumer's subscription[] failed, group: ", requestHeader.getSubscription(), requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
            response.setRemark("parse the consumer's subscription failed");
            return response;
        
     else 
        //获取消费者组信息
        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
        if (null == consumerGroupInfo) 
            log.warn("the consumer's group info not exist, group: ", requestHeader.getConsumerGroup());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        
        //如果不支持广播消费但是消费者消费模式是广播消费,则直接返回
        if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) 
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
            return response;
        
        //获取broker缓存的此consumerGroupInfo中关于此topic的订阅关系
        subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
        if (null == subscriptionData) 
            log.warn("the consumer's subscription not exist, group: , topic:", requestHeader.getConsumerGroup(), requestHeader.getTopic());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
            response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
            return response;
        
        //比较订阅关系版本
        if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) 
            log.warn("The broker's subscription is not latest, group:  ", requestHeader.getConsumerGroup(), subscriptionData.getSubString());
            response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
            response.setRemark("the consumer's subscription not latest");
            return response;
        
        //如果订阅关系表达式不是TAG类型,那么构建consumerFilterData
        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) 
            consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
            if (consumerFilterData == null) 
                response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
                response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
                return response;
            
            if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) 
                log.warn("The broker's consumer filter data is not latest, group: , topic: , serverV: , clientV: ", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
                response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
                response.setRemark("the consumer's consumer filter data not latest");
                return response;
            
        
    
    //如果订阅关系表达式不是TAG类型,并且enablePropertyFilter没有开启支持SQL92,那么抛出异常
    //也就是说,如果消费者使用SQL92模式订阅,那么需要现在broker端设置enablePropertyFilter=true
    if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) 
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
        return response;
    

    MessageFilter messageFilter;
    //重试topic是否支持filter过滤,默认false,即重试topic是不支持过滤额
    if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) 
        messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
     else 
        //创建普通的ExpressionMessageFilter,内部保存了消费者启动时通过心跳上报的订阅关系
        //一般基于tag订阅的情况下,consumerFilterData是null,通过subscriptionData进行过滤
        messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
    

    /*
     * 2 通过DefaultMessageStore#getMessage方法批量拉取消息,并且进行过滤操作
     */
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    /*
     * 3 对于拉取结果GetMessageResult进行处理
     */
    if (getMessageResult != null) 
        //设置拉去状态枚举名字
        response.setRemark(getMessageResult.getStatus().name());
        //设置下次拉取的consumeQueue的起始逻辑偏移量
        responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
        //设置consumeQueue的最小、最大的逻辑偏移量maxOffset和minOffset
        responseHeader.setMinOffset(getMessageResult.getMinOffset());
        responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
        /*
         * 3.1 判断并设置下次拉取消息的建议broker是MATER还是SLAVE
         */
        //是否建议从SLAVE拉取消息
        if (getMessageResult.isSuggestPullingFromSlave()) 
            //设置建议的brokerId为从服务器的id 1
            responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
         else 
            //否则,设置建议的brokerId为主服务器的id 0
            responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
        
        //判断broker角色
        switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) 
            case ASYNC_MASTER:
            case SYNC_MASTER:
                break;
            case SLAVE:
                //如果是SLAVE,并且从服务器不可读
                if (!this.brokerController.getBrokerConfig().以上是关于RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic

RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic

RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字