分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
Posted 芋道源码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)相关的知识,希望对你有一定的参考价值。
摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 RocketMQ 4.0.x 正式版
1、概述
2、Consumer
3、PushConsumer 一览
4、PushConsumer 订阅
5、PushConsumer 消息队列分配
6、PushConsumer 消费进度读取
7、PushConsumer 拉取消息
8、PushConsumer 消费消息
9、PushConsumer 发回消费失败消息
10、Consumer 消费进度
11、结尾
1、概述
本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。
主要解析 Consumer
在 消费 逻辑涉及到的源码。
2、Consumer
MQ 提供了两类消费者:
PushConsumer:
在大多数场景下使用。
名字虽然是
Push
开头,实际在实现时,使用Pull
方式实现。通过Pull
不断不断不断轮询Broker
获取消息。当不存在新消息时,Broker
会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和Broker
主动Push
做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询(Long-Polling
)。PullConsumer
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
本文主要讲解 PushConsumer
,部分讲解 PullConsumer
,跳过 顺序消费
。
3、PushConsumer 一览
先看一张 PushConsumer
包含的组件以及组件之间的交互图:
RebalanceService
:均衡消息队列服务,负责分配当前Consumer
可消费的消息队列(MessageQueue
)。当有新的Consumer
的加入或移除,都会重新分配消息队列。PullMessageService
:拉取消息服务,不断不断不断从Broker
拉取消息,并提交消费任务到ConsumeMessageService
。ConsumeMessageService
:消费消息服务,不断不断不断消费消息,并处理消费结果。RemoteBrokerOffsetStore
:Consumer
消费进度管理,负责从Broker
获取消费进度,同步消费进度到Broker
。ProcessQueue
:消息处理队列。MQClientInstance
:封装对Namesrv
,Broker
的 API调用,提供给Producer
、Consumer
使用。
4、PushConsumer 订阅
DefaultMQPushConsumerImpl#subscribe(...)
1: public void subscribe(String topic, String subExpression) throws MQClientException {
2: try {
3: // 创建订阅数据
4: SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
5: topic, subExpression);
6: this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
7: // 通过心跳同步Consumer信息到Broker
8: if (this.mQClientFactory != null) {
9: this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
10: }
11: } catch (Exception e) {
12: throw new MQClientException("subscription exception", e);
13: }
14: }
说明 :订阅
Topic
。第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。
第 7 至 10 行 :通过心跳同步
Consumer
信息到Broker
。
FilterAPI.buildSubscriptionData(...)
1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
2: String subString) throws Exception {
3: SubscriptionData subscriptionData = new SubscriptionData();
4: subscriptionData.setTopic(topic);
5: subscriptionData.setSubString(subString);
6: // 处理订阅表达式
7: if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
8: subscriptionData.setSubString(SubscriptionData.SUB_ALL);
9: } else {
10: String[] tags = subString.split("\\|\\|");
11: if (tags.length > 0) {
12: for (String tag : tags) {
13: if (tag.length() > 0) {
14: String trimString = tag.trim();
15: if (trimString.length() > 0) {
16: subscriptionData.getTagsSet().add(trimString);
17: subscriptionData.getCodeSet().add(trimString.hashCode());
18: }
19: }
20: }
21: } else {
22: throw new Exception("subString split error");
23: }
24: }
25:
26: return subscriptionData;
27: }
说明 :根据
Topic
和 订阅表达式 创建订阅数据subscriptionData.subVersion = System.currentTimeMillis()。
DefaultMQPushConsumer#registerMessageListener(...)
1: public void registerMessageListener(MessageListenerConcurrently messageListener) {
2: this.messageListener = messageListener;
3: this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
4: }
说明 :注册消息监听器。
5、PushConsumer 消息队列分配
RebalanceService
1: public class RebalanceService extends ServiceThread {
2:
3: /**
4: * 等待间隔,单位:毫秒
5: */
6: private static long waitInterval =
7: Long.parseLong(System.getProperty(
8: "rocketmq.client.rebalance.waitInterval", "20000"));
9:
10: private final Logger log = ClientLogger.getLog();
11: /**
12: * MQClient对象
13: */
14: private final MQClientInstance mqClientFactory;
15:
16: public RebalanceService(MQClientInstance mqClientFactory) {
17: this.mqClientFactory = mqClientFactory;
18: }
19:
20: @Override
21: public void run() {
22: log.info(this.getServiceName() + " service started");
23:
24: while (!this.isStopped()) {
25: this.waitForRunning(waitInterval);
26: this.mqClientFactory.doRebalance();
27: }
28:
29: log.info(this.getServiceName() + " service end");
30: }
31:
32: @Override
33: public String getServiceName() {
34: return RebalanceService.class.getSimpleName();
35: }
36: }
说明 :均衡消息队列服务,负责分配当前
Consumer
可消费的消息队列(MessageQueue
)。第 26 行 :调用
MQClientInstance#doRebalance(...)
分配消息队列。目前有三种情况情况下触发:详细解析见:MQClientInstance#doRebalance(...)。
如
第25行
等待超时,每 20s 调用一次。PushConsumer
启动时,调用rebalanceService#wakeup(...)
触发。Broker
通知Consumer
加入 或 移除时,Consumer
响应通知,调用rebalanceService#wakeup(...)
触发。
MQClientInstance#doRebalance(...)
1: public void doRebalance() {
2: for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
3: MQConsumerInner impl = entry.getValue();
4: if (impl != null) {
5: try {
6: impl.doRebalance();
7: } catch (Throwable e) {
8: log.error("doRebalance exception", e);
9: }
10: }
11: }
12: }
说明 :遍历当前
Client
包含的consumerTable
(Consumer
集合 ),执行消息队列分配。疑问:目前代码调试下来,
consumerTable
只包含Consumer
自己。以上是关于分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)的主要内容,如果未能解决你的问题,请参考以下文章
分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
分布式技术专题分布式消息队列-RocketMQ延迟消息实现原理和源码分析