RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了消费者负载均衡服务RebalanceService的具体负载均衡过程的源码。

上一篇文章我们学习了RocketMQ源码(16)—消费者负载均衡服务RebalanceService入口源码。现在我们将会介绍具体的负载的过程源码。

文章目录

1 doRebalance执行重平衡

负载均衡or重平衡的触发操作,最终都会执行MQClientInstance的doRebalance方法。该方法将会遍历consumerTable,获取每一个消费者MQConsumerInner,即DefaultMQPushConsumerImpl或者其他实例,然后通过消费者本身来执行重平衡操作。

/**
* MQClientInstance的方法
* <p>
* 执行重平衡
*/
public void doRebalance() 
    //遍历consumerTable
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) 
        //获取一个消费者,即DefaultMQPushConsumerImpl或者其他实例
        MQConsumerInner impl = entry.getValue();

        if (impl != null) 
            try 
                //通过消费者本身来执行重平衡操作
                impl.doRebalance();
             catch (Throwable e) 
                log.error("doRebalance exception", e);
            
        
    

MQConsumerInner有三种实现,分别是DefaultLitePullConsumerImpl、DefaultMQPullConsumerImpl、DefaultMQPushConsumerImpl,前两个都用的很少,他们的doRebalance源码也都很简单,即调用各自内部的rebalanceImpl#doRebalance(false)方法即可。

@Override
public void doRebalance() 
    if (this.rebalanceImpl != null) 
        this.rebalanceImpl.doRebalance(false);
    

我们最常使用的是DefaultMQPushConsumerImpl,它的doRebalance方法也很简单,如果该消费者没有暂停,那么同样调用rebalanceImpl#doRebalance方法即可。

/**
 * DefaultMQPushConsumerImpl的方法
 * <p>
 * 执行重平衡
 */
@Override
public void doRebalance() 
    //如果服务没有暂停,那么调用rebalanceImpl执行重平衡
    if (!this.pause) 
        //isConsumeOrderly表示是否是顺序消费
        this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
    

2 RebalanceImpl#doRebalance执行重平衡

该方法将会获取当前消费者的订阅信息集合,然后遍历订阅信息集合,获取订阅的topic,调用rebalanceByTopic方法对该topic进行重平衡。

/**
 * RebalanceImpl的方法
 * <p>
 * 执行重平衡
 *
 * @param isOrder 是否顺序消费
 */
public void doRebalance(final boolean isOrder) 
    //获取当前消费者的订阅信息集合
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    if (subTable != null) 
        //遍历订阅信息集合
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) 
            //获取topic
            final String topic = entry.getKey();
            try 
                /*
                 * 对该topic进行重平衡
                 */
                this.rebalanceByTopic(topic, isOrder);
             catch (Throwable e) 
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                    log.warn("rebalanceByTopic Exception", e);
                
            
        
    
    /*
     * 丢弃不属于当前消费者订阅的topic的队列快照ProcessQueue
     */
    this.truncateMessageQueueNotMyTopic();

3 rebalanceByTopic根据topic执行重平衡

该方法根据topic进行重平衡,将会根据不同的消息模式执行不同的处理策略。

  1. 如果是广播模式,广播模式下并没有负载均衡可言,每个consumer都会消费所有队列中的全部消息,仅仅是更新当前consumer的处理队列processQueueTable的信息。
  2. 如果是集群模式,首先基于负载均衡策略确定分配给当前消费者的MessageQueue,然后更新当前consumer的处理队列processQueueTable的信息。

集群模式的大概步骤为:

  1. 首先获取该topic的所有消息队列集合mqSet,随后从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合cidAll。一个clientId代表一个消费者。
  2. 对topic的消息队列和clientId集合分别进行排序。排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的。
  3. 获取分配消息队列的策略实现AllocateMessageQueueStrategy,即负载均衡的策略类,执行allocate方法,为当前clientId也就是当前消费者,分配消息队列,这一步就是执行负载均衡或者说重平衡的算法。
  4. 调用updateProcessQueueTableInRebalance方法,更新新分配的消息队列的处理队列processQueueTable的信息,为新分配的消息队列创建最初的pullRequest并分发给PullMessageService。
  5. 如果processQueueTable发生了改变,那么调用messageQueueChanged方法。设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系。
/**
 * RebalanceImpl的方法
 * <p>
 * 根据topic进行重平衡
 */
private void rebalanceByTopic(final String topic, final boolean isOrder) 
    //根据不同的消息模式执行不同的处理策略
    switch (messageModel) 
        /*
         * 广播模式的处理
         * 广播模式下并没有负载均衡可言,每个consumer都会消费所有队列中的全部消息,仅仅是更新当前consumer的处理队列processQueueTable的信息
         */
        case BROADCASTING: 
            //获取topic的消息队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) 
                /*
                 * 直接更新全部消息队列的处理队列processQueueTable的信息,创建最初的pullRequest并分发给PullMessageService
                 */
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                //如果processQueueTable发生了改变
                if (changed) 
                    /*
                     * 设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系
                     */
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged    ",
                            consumerGroup,
                            topic,
                            mqSet,
                            mqSet);
                
             else 
                log.warn("doRebalance, , but the topic[] not exist.", consumerGroup, topic);
            
            break;
        
        /*
         * 集群模式的处理
         * 基于负载均衡策略确定跟配给当前消费者的MessageQueue,然后更新当前consumer的处理队列processQueueTable的信息
         */
        case CLUSTERING: 
            //获取topic的消息队列
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            /*
             * 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
             * 一个clientId代表一个消费者
             */
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) 
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                    log.warn("doRebalance, , but the topic[] not exist.", consumerGroup, topic);
                
            

            if (null == cidAll) 
                log.warn("doRebalance,  , get consumer id list failed", consumerGroup, topic);
            

            if (mqSet != null && cidAll != null) 
                //将topic的消息队列存入list集合中
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                mqAll.addAll(mqSet);
                /*
                 * 对topic的消息队列和clientId集合分别进行排序
                 * 排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的
                 */
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                //获取分配消息队列的策略实现,即负载均衡的策略类
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                List<MessageQueue> allocateResult = null;
                try 
                    /*
                     * 为当前clientId也就是当前消费者,分配消息队列
                     * 这一步就是执行负载均衡或者说重平衡的算法
                     */
                    allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                 catch (Throwable e) 
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName=", strategy.getName(),
                            e);
                    return;
                
                //对消息队列去重
                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) 
                    allocateResultSet.addAll(allocateResult);
                
                /*
                 * 更新新分配的消息队列的处理队列processQueueTable的信息,创建最初的pullRequest并分发给PullMessageService
                 */
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                //如果processQueueTable发生了改变
                if (changed) 
                    log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName=, group=, topic=, clientId=, mqAllSize=, cidAllSize=, rebalanceResultSize=, rebalanceResultSet=",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                    /*
                     * 设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系
                     */
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                
            
            break;
        
        default:
            break;
    

4 findConsumerIdList查找客户端id集合

该方法从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合,用于后续负载均衡策略。一个cliendId代表着一个消费者。

首先通过findBrokerAddrByTopic方法随机选择一个当前topic所属的broker,如果broker地址为null则请求nameserver更新topic路由信息。然后调用getConsumerIdListByGroup方法根据brokerAddr和group 发起请求到broekr,得到消费者客户端id列表。

从这里的源码能够看出来,RocketMQ一个消费者组内的消费者订阅的topic都必须一致,否则就会出现订阅的topic被覆盖的情况。

/**
 * MQClientInstance的方法
 * <p>
 * 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
 */
public List<String> findConsumerIdList(final String topic, final String group) 
    //随机选择一个当前topic所属的broker
    String brokerAddr = this.findBrokerAddrByTopic(topic);
    if (null == brokerAddr) 
        //如果broker地址为null则请求nameserver更新topic路由信息
        this.updateTopicRouteInfoFromNameServer(topic);
        brokerAddr = this.findBrokerAddrByTopic(topic);
    

    if (null != brokerAddr) 
        try 
            //根据brokerAddr和group 得到消费者客户端id列表
            return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
         catch (Exception e) 
            log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
        
    

    return null;

4.1 findBrokerAddrByTopic随机查找broker

从topicRouteTable中获取topic路由信息,然后随机选择一个broker返回。为社么随机返回就可以呢?因为consumer会向所有broker上报心跳信息,因此这些broker中的客户端id是一致的。并且,RocketMQ默认一个消费者组的所有消费的订阅信息都是一致的,因此随便哪个broker上关于此Group所有ConsumerId集合都是一样的。

/**
 * MQClientInstance的方法
 * <p>
 * 随机选取指定topic的一个broker
 */
public String findBrokerAddrByTopic(final String topic) 
    //获取topic路由信息
    TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
    if (topicRouteData != null) 
        //获取全部broker地址数据
        List<BrokerData> brokers = topicRouteData.getBrokerDatas();
        if (!brokers.isEmpty()) 
            //随机选择一个broker返回
            int index = random.nextInt(brokers.size());
            BrokerData bd = brokers.get(index % brokers.size());
            return bd.selectBrokerAddr();
        
    

    return null;

4.2 getConsumerIdListByGroup获取Group所有ConsumerId集合

该方法向指定地址的broker发起网络请求,查找指定group的全部消费者客户端id列表并返回。请求Code为GET_CONSUMER_LIST_BY_GROUP

/**
 * MQClientAPIImpl的方法
 * 根据brokerAddr和group 得到消费者客户端id列表
 */
public List<String> getConsumerIdListByGroup(
        final String addr,
        final String consumerGroup,
        final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        MQBrokerException, InterruptedException 
    //构建请求头
    GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    //构建请求命令对象,Code为GET_CONSUMER_LIST_BY_GROUP
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
    //发起同步调用
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) 
        case ResponseCode.SUCCESS: 
            if (response.getBody() != null) 
                //响应解码
                GetConsumerListByGroupResponseBody body =
                        GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
                //返回客户端id集合
                return body.getConsumerIdList();
            
        
        default:
            break;
    

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);

4.2.1 broker处理getConsumerListByGroup请求

broker通过ConsumerManageProcessor对于处理GET_CONSUMER_LIST_BY_GROUP的请求。

/**
 * ConsumerManageProcessor的方法
 */
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException 
    switch (request.getCode()) 
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            //返回指定group的所有客户端id集合
            return this.getConsumerListByGroup(ctx, request);
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    
    return null;

4.2.2 ConsumerManageProcessor#getConsumerListByGroup

返回指定group的所有客户端id集合。

/**
 * ConsumerManageProcessor的方法
 * <p>
 * 返回指定group的所有客户端id集合
 */
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException 
    //创建响应命令对象
    final RemotingCommand response =
            RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
    //解析请求头
    final GetConsumerListByGroupRequestHeader requestHeader =
            (GetConsumerListByGroupRequestHeader) request
                    .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
    //从broker的consumerTable中获取指定group的消费者组信息
    ConsumerGroupInfo consumerGroupInfo =
            this.brokerController.getConsumerManager().getConsumerGroupInfo(
                    requestHeader.getConsumerGroup());

    if (consumerGroupInfo != null) 
        //获取所有客户端id集合
        List<String> clientIds = consumerGroupInfo.getAllClientId();
        if (!clientIds.isEmpty()) 
            GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
            body.setConsumerIdList(clientIds);
            response.setBody(body.encode());
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
         else 
            log.warn("getAllClientId failed,  ", requestHeader.

以上是关于RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码—RocketMQ源码调试环境准备

源码分析RocketMQ系列索引

RocketMQ 源码合集

RocketMQ 源码合集

SpringBoot(17)---SpringBoot整合RocketMQ

#yyds干货盘点# 一文带你 RocketMQ 源码调试环境搭建