RocketMQ Consumer消息消费过程queue均衡
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ Consumer消息消费过程queue均衡相关的知识,希望对你有一定的参考价值。
说明
关于RocketMQ Consumer消费消息的过程,内容较多,为了能将消费过程中涉及的关键技术讲全、讲明白,所以这一部分,根据消费过程的主要处理内容,我将分三个小部分进行:
1、queue的均衡:针对topic的queue,如何分配给不同的Consumer进行消费,分配的算法有哪些。
2、消息的拉取:拉取消息的过程及消息处理。
3、消息的消费:对拉取到的消息推送给Consumer处理。
这一节,我们先学习queue的均衡流程。
均衡过程
为了让大家对queue均衡过程先有一个总的认识,这里先上一张均衡过程的流程图:
queue均衡入口
在《Consumer启动过程》一节中,我们已经知道:在Consumer启动的过程中,其中有一步是对MQClientInstance的启动(由于启动过程与Producer启动过程一致,所以在Consumer启动过程中没有细说,可直接参考《Producer启动过程》中的MQClientInstance的启动过程)。在MQClientInstance的启动的过程中,又会对RebalanceService组件的启动。RebalanceService组件的启动,就是触发queue均衡的入口,也是触发Consumer进行消息消费的入口。
RebalanceService是一个线程任务,RebalanceService组件的启动,最终启动的是一个执行均衡任务的线程:
线程启动后,默认每隔20s执行一次均衡任务,直到RebalanceService组件停止为止。
均衡过程
下面,我们跟着源码了解queue均衡的过程。
1、MQClientInstance.doRebalance():直接调用DefaultMQPushConsumerImpl.doRebalance()进行处理:
2、DefaultMQPushConsumerImpl.doRebalance():直接调用RebalancePushImpl.doRebalance()进行处理:
3、RebalancePushImpl.doRebalance():对当前Consumer订阅的Topic,逐一调用当前类的rebalanceByTopic方法进行处理:
rebalanceByTopic方法分广播和集群两种消费模式进行queue均衡,这里我们先关注集群消费模式的均衡过程。
4、RebalancePushImpl.rebalanceByTopic(…):只要分以下几个步骤进行处理:
(1)、首先根据Topic和ConsumerGroup从Broker中获取获取监听该Topic的所有ConuserId
(2)、对MessageQueue和ConuserId进行升序排序,然后调用AllocateMessageQueueStrategy.allocate(…)方法进行处理(allocateMessageQueueStrategy在DefaultMQPushConsumer初始化,默认使用的是AllocateMessageQueueAveragely):
(3)、AllocateMessageQueueStrategy.allocate(…):根据计算规则,均衡后将当前Consumer需要消费的MessageQueue返回:
均衡规则:
a、如果queue的数量≤Consumer的数量,则一个Consumer最多消费一个queue的消息(有可能有的Consumer不用消费)。
b、如果queue的数量大于Consumer的数量,则计算每个Consumer平均(queue数量/Consumer数量余数前的Consumer)需要消费的queue数量。如果有余数,则前面的每个Consumer会消费多一个queue的消息。根据计算每个Consumer需要消费的queue数量,顺序将queue分配到对应的Consumer中。
(4)、均衡完之后,将均衡的结果更新到该类的processQueueTable属性中:
a、更新本地的processQueueTable属性中。该属性维护了MessageQueue与ProcessQueue的对应关系,在消息消费过程会使用到。
过期判断规则:
b、针对需要新增消费的queue,则增加消息消费任务。
调用DefaultMQPushConsumerImpl.executePullRequestImmediately()
DefaultMQPushConsumerImpl将拉取消息的请求委托给PullMessageService
PullMessageService将拉取消息的请求PullRequest放入队列,由线程池执行消息拉取任务。
至此,均衡任务已经完成(增加消息消费任务属于消费过程,放到下一节学习)。
均衡规则:
RocketMQ提供了丰富的queue均衡规则,具体有以下几种:
1、AllocateMessageQueueAveragely:默认均衡规则,以上已经讲解。
2、AllocateMachineRoomNearby:根据Consumer与Broker的距离远近进行分配,从源码看,该策略未完整实现。
3、AllocateMessageQueueAveragelyByCircle:循环平均分配。是第1种方式的变种。针对queue数量多余Consumer数量的情况下,使用循环分配规则。如有3个Consumer、5个queue,则Consumer0消费queue0和queue3、Consumer1消费queue1和queue4、Consumer2消费queue2。
4、AllocateMessageQueueByConfig:根据配置进行分配。未实现。
5、AllocateMessageQueueByMachineRoom:机房分配策略。
6、AllocateMessageQueueConsistentHash:一致性Hash方式分配。
以上是关于RocketMQ Consumer消息消费过程queue均衡的主要内容,如果未能解决你的问题,请参考以下文章