RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了消费者负载均衡服务RebalanceService的具体负载均衡过程的源码。
上一篇文章我们学习了RocketMQ源码(16)—消费者负载均衡服务RebalanceService入口源码。现在我们将会介绍具体的负载的过程源码。
文章目录
- 1 doRebalance执行重平衡
- 2 RebalanceImpl#doRebalance执行重平衡
- 3 rebalanceByTopic根据topic执行重平衡
- 4 findConsumerIdList查找客户端id集合
- 5 allocate分配消息队列
- 6 updateProcessQueueTableInRebalance更新处理队列
- 7 messageQueueChanged更新消息队列
- 8 总结
1 doRebalance执行重平衡
负载均衡or重平衡的触发操作,最终都会执行MQClientInstance的doRebalance方法。该方法将会遍历consumerTable,获取每一个消费者MQConsumerInner,即DefaultMQPushConsumerImpl或者其他实例,然后通过消费者本身来执行重平衡操作。
/**
* MQClientInstance的方法
* <p>
* 执行重平衡
*/
public void doRebalance()
//遍历consumerTable
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet())
//获取一个消费者,即DefaultMQPushConsumerImpl或者其他实例
MQConsumerInner impl = entry.getValue();
if (impl != null)
try
//通过消费者本身来执行重平衡操作
impl.doRebalance();
catch (Throwable e)
log.error("doRebalance exception", e);
MQConsumerInner有三种实现,分别是DefaultLitePullConsumerImpl、DefaultMQPullConsumerImpl、DefaultMQPushConsumerImpl,前两个都用的很少,他们的doRebalance源码也都很简单,即调用各自内部的rebalanceImpl#doRebalance(false)方法即可。
@Override
public void doRebalance()
if (this.rebalanceImpl != null)
this.rebalanceImpl.doRebalance(false);
我们最常使用的是DefaultMQPushConsumerImpl,它的doRebalance方法也很简单,如果该消费者没有暂停,那么同样调用rebalanceImpl#doRebalance方法即可。
/**
* DefaultMQPushConsumerImpl的方法
* <p>
* 执行重平衡
*/
@Override
public void doRebalance()
//如果服务没有暂停,那么调用rebalanceImpl执行重平衡
if (!this.pause)
//isConsumeOrderly表示是否是顺序消费
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
2 RebalanceImpl#doRebalance执行重平衡
该方法将会获取当前消费者的订阅信息集合,然后遍历订阅信息集合,获取订阅的topic,调用rebalanceByTopic方法对该topic进行重平衡。
/**
* RebalanceImpl的方法
* <p>
* 执行重平衡
*
* @param isOrder 是否顺序消费
*/
public void doRebalance(final boolean isOrder)
//获取当前消费者的订阅信息集合
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null)
//遍历订阅信息集合
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet())
//获取topic
final String topic = entry.getKey();
try
/*
* 对该topic进行重平衡
*/
this.rebalanceByTopic(topic, isOrder);
catch (Throwable e)
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
log.warn("rebalanceByTopic Exception", e);
/*
* 丢弃不属于当前消费者订阅的topic的队列快照ProcessQueue
*/
this.truncateMessageQueueNotMyTopic();
3 rebalanceByTopic根据topic执行重平衡
该方法根据topic进行重平衡,将会根据不同的消息模式执行不同的处理策略。
- 如果是广播模式,广播模式下并没有负载均衡可言,每个consumer都会消费所有队列中的全部消息,仅仅是更新当前consumer的处理队列processQueueTable的信息。
- 如果是集群模式,首先基于负载均衡策略确定分配给当前消费者的MessageQueue,然后更新当前consumer的处理队列processQueueTable的信息。
集群模式的大概步骤为:
- 首先获取该topic的所有消息队列集合mqSet,随后从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合cidAll。一个clientId代表一个消费者。
- 对topic的消息队列和clientId集合分别进行排序。排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的。
- 获取分配消息队列的策略实现AllocateMessageQueueStrategy,即负载均衡的策略类,执行allocate方法,为当前clientId也就是当前消费者,分配消息队列,这一步就是执行负载均衡或者说重平衡的算法。
- 调用updateProcessQueueTableInRebalance方法,更新新分配的消息队列的处理队列processQueueTable的信息,为新分配的消息队列创建最初的pullRequest并分发给PullMessageService。
- 如果processQueueTable发生了改变,那么调用messageQueueChanged方法。设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系。
/**
* RebalanceImpl的方法
* <p>
* 根据topic进行重平衡
*/
private void rebalanceByTopic(final String topic, final boolean isOrder)
//根据不同的消息模式执行不同的处理策略
switch (messageModel)
/*
* 广播模式的处理
* 广播模式下并没有负载均衡可言,每个consumer都会消费所有队列中的全部消息,仅仅是更新当前consumer的处理队列processQueueTable的信息
*/
case BROADCASTING:
//获取topic的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null)
/*
* 直接更新全部消息队列的处理队列processQueueTable的信息,创建最初的pullRequest并分发给PullMessageService
*/
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
//如果processQueueTable发生了改变
if (changed)
/*
* 设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系
*/
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged ",
consumerGroup,
topic,
mqSet,
mqSet);
else
log.warn("doRebalance, , but the topic[] not exist.", consumerGroup, topic);
break;
/*
* 集群模式的处理
* 基于负载均衡策略确定跟配给当前消费者的MessageQueue,然后更新当前consumer的处理队列processQueueTable的信息
*/
case CLUSTERING:
//获取topic的消息队列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
/*
* 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
* 一个clientId代表一个消费者
*/
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet)
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
log.warn("doRebalance, , but the topic[] not exist.", consumerGroup, topic);
if (null == cidAll)
log.warn("doRebalance, , get consumer id list failed", consumerGroup, topic);
if (mqSet != null && cidAll != null)
//将topic的消息队列存入list集合中
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
/*
* 对topic的消息队列和clientId集合分别进行排序
* 排序能够保证,不同的客户端消费者在进行负载均衡时,其mqAll和cidAll中的元素顺序是一致的
*/
Collections.sort(mqAll);
Collections.sort(cidAll);
//获取分配消息队列的策略实现,即负载均衡的策略类
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try
/*
* 为当前clientId也就是当前消费者,分配消息队列
* 这一步就是执行负载均衡或者说重平衡的算法
*/
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
catch (Throwable e)
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName=", strategy.getName(),
e);
return;
//对消息队列去重
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null)
allocateResultSet.addAll(allocateResult);
/*
* 更新新分配的消息队列的处理队列processQueueTable的信息,创建最初的pullRequest并分发给PullMessageService
*/
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
//如果processQueueTable发生了改变
if (changed)
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName=, group=, topic=, clientId=, mqAllSize=, cidAllSize=, rebalanceResultSize=, rebalanceResultSet=",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
/*
* 设置新的本地订阅关系版本,重设流控参数,立即给所有broker发送心跳,让Broker更新当前订阅关系
*/
this.messageQueueChanged(topic, mqSet, allocateResultSet);
break;
default:
break;
4 findConsumerIdList查找客户端id集合
该方法从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合,用于后续负载均衡策略。一个cliendId代表着一个消费者。
首先通过findBrokerAddrByTopic方法随机选择一个当前topic所属的broker,如果broker地址为null则请求nameserver更新topic路由信息。然后调用getConsumerIdListByGroup方法根据brokerAddr和group 发起请求到broekr,得到消费者客户端id列表。
从这里的源码能够看出来,RocketMQ一个消费者组内的消费者订阅的topic都必须一致,否则就会出现订阅的topic被覆盖的情况。
/**
* MQClientInstance的方法
* <p>
* 从topic所在的broker中获取当前consumerGroup的clientId集合,即消费者客户端id集合
*/
public List<String> findConsumerIdList(final String topic, final String group)
//随机选择一个当前topic所属的broker
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr)
//如果broker地址为null则请求nameserver更新topic路由信息
this.updateTopicRouteInfoFromNameServer(topic);
brokerAddr = this.findBrokerAddrByTopic(topic);
if (null != brokerAddr)
try
//根据brokerAddr和group 得到消费者客户端id列表
return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, clientConfig.getMqClientApiTimeout());
catch (Exception e)
log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
return null;
4.1 findBrokerAddrByTopic随机查找broker
从topicRouteTable中获取topic路由信息,然后随机选择一个broker返回。为社么随机返回就可以呢?因为consumer会向所有broker上报心跳信息,因此这些broker中的客户端id是一致的。并且,RocketMQ默认一个消费者组的所有消费的订阅信息都是一致的,因此随便哪个broker上关于此Group所有ConsumerId集合都是一样的。
/**
* MQClientInstance的方法
* <p>
* 随机选取指定topic的一个broker
*/
public String findBrokerAddrByTopic(final String topic)
//获取topic路由信息
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null)
//获取全部broker地址数据
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
if (!brokers.isEmpty())
//随机选择一个broker返回
int index = random.nextInt(brokers.size());
BrokerData bd = brokers.get(index % brokers.size());
return bd.selectBrokerAddr();
return null;
4.2 getConsumerIdListByGroup获取Group所有ConsumerId集合
该方法向指定地址的broker发起网络请求,查找指定group的全部消费者客户端id列表并返回。请求Code为GET_CONSUMER_LIST_BY_GROUP。
/**
* MQClientAPIImpl的方法
* 根据brokerAddr和group 得到消费者客户端id列表
*/
public List<String> getConsumerIdListByGroup(
final String addr,
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException
//构建请求头
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
//构建请求命令对象,Code为GET_CONSUMER_LIST_BY_GROUP
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
//发起同步调用
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode())
case ResponseCode.SUCCESS:
if (response.getBody() != null)
//响应解码
GetConsumerListByGroupResponseBody body =
GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
//返回客户端id集合
return body.getConsumerIdList();
default:
break;
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
4.2.1 broker处理getConsumerListByGroup请求
broker通过ConsumerManageProcessor对于处理GET_CONSUMER_LIST_BY_GROUP的请求。
/**
* ConsumerManageProcessor的方法
*/
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException
switch (request.getCode())
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
//返回指定group的所有客户端id集合
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
return null;
4.2.2 ConsumerManageProcessor#getConsumerListByGroup
返回指定group的所有客户端id集合。
/**
* ConsumerManageProcessor的方法
* <p>
* 返回指定group的所有客户端id集合
*/
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException
//创建响应命令对象
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
//解析请求头
final GetConsumerListByGroupRequestHeader requestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
//从broker的consumerTable中获取指定group的消费者组信息
ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(
requestHeader.getConsumerGroup());
if (consumerGroupInfo != null)
//获取所有客户端id集合
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty())
GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
body.setConsumerIdList(clientIds);
response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
else
log.warn("getAllClientId failed, ", requestHeader.以上是关于RocketMQ源码(17)—RebalanceService消费者负载均衡过程源码的主要内容,如果未能解决你的问题,请参考以下文章