rocketMq - rebalance介绍
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMq - rebalance介绍相关的知识,希望对你有一定的参考价值。
参考技术A rocketMq概念介绍rocketMq-namesrv介绍
rocketMq-Topic创建过程
rocketMq-producer介绍
rocketMq-consumer介绍
rocketMq - rebalance介绍
rocketMq - 并发消费过程
rocketMq - 串行消费过程
rocketMq-broker介绍
rocketMq-broker消息存储介绍
rocketMq - commitLog
rocketMq - index介绍
rocketMq-延迟消息介绍
rocketMq-事务消息介绍
rocketMq消息查询
rocketMq和kafka的架构区别
rocketMq - master/slave同步
本章主要讲解Rebalance在consumer端的作用,如果要理解consumer的逻辑,就必须要知道在consumer端有Rebalance这个服务,没有Rebalance也就没有consumer的消息拉取。
1、从namesrv获取messageQueue信息
2、从broker获取consumer信息
3、选择Rebalance策略
4、三者结合实现Rebalance操作
Rebalance是针对Topic+ConsumerGroup进行Rebalance的,在我们创建的comsumer过程中会订阅topic(包括%retry%consumerGroup),Rebalance就是要这些Topic下的所有messageQueue按照一定的规则分发给consumerGroup下的consumer进行消费。
说明:参见RebalanceImpl类
1、着重需要强调的概念,Rebalance是针对订阅的topic进行Rebalance,也就是假如consumer订阅了10个Topic,那么我们就需要对10个Topic里的每一个Topic进行Rebalance。
说明:参见RebalanceImpl类
1、Rebalance的过程需要3个要素,分别是Topic下的所有MessageQueue、consumerGroup下的所有consumer、Rebalance策略。这里的MessageQueue是指Topic在每个broker上的队列配置信息。
2、获取MessageQueue信息,获取consumerGroup下的consumer信息,根据Rebalance策略进行Rebalance。
3、更新Rebalance的结果进行消息的拉取。
4、Rebalance更新consumer负责的messageQueue
说明:参见RebalanceImpl类
说明:参见RebalanceImpl类
说明:参见AllocateMessageQueueAveragely类
1、举其中一种策略说明,这个策略是考虑当前consumerId的位置,consumer的数量,MessageQueue的数量,根据consumerId所处的位置决定分配多少消费队列。
2、该过程会动态调整,也可能会不一致,因为依赖的数据来自broker会有不一致,但是最终肯定会一致,周期性的Rebalance的作用。
说明:参见MQClientInstance类
1、consumer端的MessageQueue是根据topic中的readQueueNums来计算的
2、计算MessageQueue的TopicRouteData是从namesrv中获取的
说明:参见MQClientInstance类
参见:MQClientAPIImpl类
说明:参见ConsumerManageProcessor类
1、获取consumer列表跟注册过程是对称的
要知道consumer的获取必须知道consumer是怎么注册的,其实consumer会把注册信息发送给broker保存,当然由于没有强一致性的保证,会存在某些极端情况下broker上的配置不一致,但是由于这是一个周期性的任务,所以最终肯定会达到一致的。
说明:参见DefaultMQPushConsumerImpl类
说明:参见MQClientInstance类
1、心跳信息包含client的注册信息
2、同步给broker的信息是最终一致性的,非强一致性
说明:参见ClientManageProcessor
1、处理consumer心跳信息的入口
说明:参见ConsumerManager类
1、broker保存consumer元信息
Kafka 0.11版本新功能介绍 —— 空消费组延时rebalance
在0.11之前的版本中,多个consumer实例加入到一个空消费组将导致多次的rebalance,这是由于每个consumer instance启动的时间不可控,很有可能超出coordinator确定的rebalance timeout(即max.poll.interval.ms),而每次rebalance的代价又相当地大,因为很多状态都需要在rebalance前被持久化,而在rebalance后被重新初始化。曾经有个国外用户,他有100个consumer,每次rebalance的时间甚至要1个小时以上!
对于目前版本的Kafka来说,consumer的rebalance的确有需要需要改进的部分,很容易想到的包括:
- 对于空消费组而言,能够稍稍放松rebalance的严苛条件,让rebalance发生的次数降低一些
- 对于有大量成员的消费组而言,是否可以考虑以前的分配方案而不是像现在这样完全抛弃之前方案从头来过
值得高兴的是,社区已经实现了第一个改进并将其集成进0.11.0.0版本中,也就是说用户在升级到0.11后便可以体验到这种延时rebalance的效果,主要表现为空消费组从EMPTY到STABLE的时间间隔应该显著缩短。本文将简要介绍一下该新功能以及实现原理。
新增参数:group.initial.rebalance.delay.ms
对于用户来说,这个改进最直接的效果就是新增了一个broker配置:group.initial.rebalance.delay.ms,默认是3秒钟。用户需要在server.properties文件中自行修改为想要配置的值。这个参数的主要效果就是让coordinator推迟空消费组接收到成员加入请求后本应立即开启的rebalance。在实际使用时,假设你预估你的所有consumer组成员加入需要在10s内完成,那么你就可以设置该参数=10000。目前来看,这个参数的使用还是很方便的~
coordinator底层修改
为了实现这一功能,需要修改一些底层的设计。首先,对于消费组状态而言,之前的文章中讨论过,当前的状态机如下图所示:
由上图可见,Empty到PreparingRebalance的转化就是发生有成员加入之后。现在在这两个状态之间新增了一个状态:InitialRebalance。那么对于一个空的消费组而言,当第一个成员加入时,组状态会进入到InitialRebalance,同时对这个JoinGroup请求的处理可能会推迟一段时间,但这段时间不会超过rebalance超时时间和group.initial.rebalance.delay.ms两者的小者。之后倘若又有一个新成员加入组,那么仍然按照之前的逻辑,组状态是InitialRebalance,但此时这个请求被推迟的最大时间将会更新为min(剩下的rebalance超时时间,group.initial.rebalance.delay.ms)。这个剩余rebalane超时=初始rebalance超时- N * group.initial.rebalance.delay.ms,N表示前面已经发生过的N次成员加入。改进后的组状态机如下图所示:
当剩余rebalance超时变更成0时,即认为延时已经过期了,因此coordinator会将消费组状态变更成PreparingRebalance,下面的事情就和之前的流程一致了。至于这些请求是如何在broker端被延时处理的,其实这也要归功于DelayedJoin以及底层的purgatory机制了,有时间的话跟大家详细说说它的设计。
以上是关于rocketMq - rebalance介绍的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 0.11版本新功能介绍 —— 空消费组延时rebalance