RocketMQ Consumer启动过程
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ Consumer启动过程相关的知识,希望对你有一定的参考价值。
目录
说明
RocketMQ消费者启动过程与生产者启动过程其实差不多,只是消费者在启动之前,会有一些比较重要前置的操作:订阅、注册消息监听器等。但整个过程来说,还是比较简单的,主要是对涉及的组件初始化、启动定时任务、初始化Netty客户端等。先来看一下消费者启动过程中涉及到的类的关系:
主要类功能说明:
1、DefaultMQPushConsumer:消费者相关操作(启动、注册消息监听器、订阅等)的总入口,依赖于DefaultMQPushConsumerImpl 实现相关的功能。
2、DefaultMQPushConsumerImpl :除了提供消息费相关的操作外,还封装了消息拉取、消费均衡、OffsetStore存储等功能,消费者的很多功能实现都在此类进行整合。
3、RebalanceImpl:对消费队列进行均衡(每个消费者该消费哪些队列)具体逻辑,均衡策略依赖于分配策略AllocateMessageQueueStrategy。
4、PullAPIWrapper:主要封装消息拉取及结果解析逻辑。
5、OffsetStore:消费者消费的OffsetStore存储策略,支持两种存储形式:如果是广播形式消费,则存储在本地;如果是集群形式,则存储在Broker上。
6、ConsumeMessageService:封装消费消息的模式:并发消费和按顺序消费。
7、MQClientInstance:主要是对MQClientAPIImpl的封装,对外提供统一的操作API。
初始化过程
在讲解启动过程之前,先了解一下初始化过程Consumer所做的工作,目的是大概了解关键信息存在哪里。初始化过程分三步:
1、DefaultMQPushConsumer初始化(new过程):初始化队列分配策略allocateMessageQueueStrategy=new AllocateMessageQueueAveragely():
初始化defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl():
2、DefaultMQPushConsumer订阅(defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, “*”)):将订阅表达式解析成SubscriptionData对象,最终将topic与SubscriptionData的关系维护到RebalanceImpl对象中:
3、注册消息监听器(defaultMQPushConsumer.registerMessageListener(listener)):监听器最终维护在DefaultMQPushConsumer和DefaultMQPushConsumerImpl中:
启动过程
1、DefaultMQPushConsumer启动
该类的start()方法就是Consumer启动的入口。其主要干的工作有两点:
(1)、启动defaultMQPushConsumerImpl
(2)、如果启动了消息跟踪机制(构造函数通过enableMsgTrace指定),则启动TraceDispatcher。
2、DefaultMQPushConsumerImpl启动
(1)、将必要的属性设置到rebalanceImpl中(设置queue均衡策略,在Consumer消费消息过程中会使用到)
(2)、初始化PullAPIWrapper
(3)、初始化offsetStore
(4)、ConsumeMessageService初始化及启动
(5)、初始化MQClientInstance,将DefaultMQProducerImpl注册到MQClientInstance,然后启动MQClientInstance
3、ConsumeMessageService启动:主要启动一个清除过期消息的定时任务
消费者消费消息时,如果消费失败,消息会暂存到当前队列对应的ProcessQueue实例中,如果消息长期消费失败,则需要对这些缓存的消息进行清除,并且上报给服务端,后续重试消费。
4、MQClientInstance启动:MQClientInstance启动过程与生产者启动过程一致。
总结
1、消费者启动过程比生产者启动过程稍微复杂,内部涉及的组件较多,在了解其过程之前,最好先了解内部不同部件的主要功能及维护了哪些关键信息。
2、消费者在启动过程,主要还是对相应的组件进行初始化、启动对应的定时任务。但有一点很重要,在启动过程会触发消息消费,消费流程在下一节详细分析。
以上是关于RocketMQ Consumer启动过程的主要内容,如果未能解决你的问题,请参考以下文章