7RocketMQ 源码解析之 Broker 启动(下)
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7RocketMQ 源码解析之 Broker 启动(下)相关的知识,希望对你有一定的参考价值。
在前面的一篇文章 – 6、RocketMQ 源码解析之 Broker 启动(上) 分析了一下 Broker 在启动的时候他自身做了哪些事,以及把 Broker 相关的信息注册到 NameServer 中去,下面我们就通过 Broker 把元信息注册到 NameServer 来了解一下 RocketMQ 是如何进行网络间通信的。
1、整体时序图
在分析 Broker 与 NameServer 通信之前,我们首先来看一下大体的时序图。这样就有一个大体的印象。这样在看源码的时候就不会迷失在细节的海洋中。
上面的图就是 Broker 与 NameServer 之间的通信图时序图,我大概解析一下它们的处理过程:
- 首先在 Broker 启动的时候就会使用
ScheduledExecutorService
每隔 60 秒定时调用BrokerController#registerBrokerAll
向 NameServer 进行元数据注册。 - 向 NameServer 主要注册两样东西,一个是 Topic 的配置信息以及
FilterServer
。Topic 的配置信息是通过TopicConfigManager
进行管理的而FilterServer
是通过FilterServerManager
进行管理的。 - 把需要注册的信息构造好然后调用
BrokerOuterAPI
向NameServer
进行注册。在这里服务间通信是通过 Netty 异步调用,所以 Broker 通过CountDownLatch
异步转成同步等待 NameServer 处理完成注册。关于网络通信过程大家可以查看之前的博客 - 4、RocketMQ 源码解析之 网络通信 Netty。 - NameServer 接收到 Broker 传递过来的请求就会调用
DefaultRequestProcessor#registerBrokerWithFilterServer
进行元数据注册。 - 首先会请求对象进行解码,然后调用
RouteInfoManager#registerBroker
完成最终的注册。
细节信息我们稍后分析。
2、Broker 注册分析
Broker 注册的时候其实包括两个部分,一个是 Topic 的配置信息,另一个就是 FIlterServer 的信息。
2.1 Topic 配置信息
Topic 的配置信息其实是由 TopicConfigManager
进行管理的,它其实是以 Topic 名称为 Key,然后以 TopicConfig
为 Value 的 Map
形式来进行管理的。下面我们来看一下 TopicConfig
的数据结构。
- topicName:topic 名称
- readQueueNums:读列表数,默认 16 个
- writeQueueNums:写列表数,默认16 个
- perm:是否可读可写
- order:是否有序
TopicConfigManager
会创建一些默认的 Topic,包括:
- SELF_TEST_TOPIC
- TBW102
- BenchmarkTest
- DefaultCluster
- OFFSET_MOVED_EVENT
- SCHEDULE_TOPIC_XXXX
- RMQ_SYS_TRACE_TOPIC
- …
其中最重要的就是 TBW102
,只有在 Broker 端开启允许自动创建 Topic 这个主题才会创建(默认允许自动创建)。
下面就是 Broker 启动的时候默认向 NameServer 注册的 Topic 信息:
"filterServerList": [],
"topicConfigSerializeWrapper":
"dataVersion":
"counter": 3,
"timestamp": 1614867690109
,
"topicConfigTable":
"SCHEDULE_TOPIC_XXXX":
"order": false,
"perm": 6,
"readQueueNums": 18,
"topicFilterType": "SINGLE_TAG",
"topicName": "SCHEDULE_TOPIC_XXXX",
"topicSysFlag": 0,
"writeQueueNums": 18
,
"TopicTest":
"order": false,
"perm": 6,
"readQueueNums": 4,
"topicFilterType": "SINGLE_TAG",
"topicName": "TopicTest",
"topicSysFlag": 0,
"writeQueueNums": 4
,
"SELF_TEST_TOPIC":
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "SELF_TEST_TOPIC",
"topicSysFlag": 0,
"writeQueueNums": 1
,
"DefaultCluster":
"order": false,
"perm": 7,
"readQueueNums": 16,
"topicFilterType": "SINGLE_TAG",
"topicName": "DefaultCluster",
"topicSysFlag": 0,
"writeQueueNums": 16
,
"DefaultCluster_REPLY_TOPIC":
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "DefaultCluster_REPLY_TOPIC",
"topicSysFlag": 0,
"writeQueueNums": 1
,
"RMQ_SYS_TRANS_HALF_TOPIC":
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "RMQ_SYS_TRANS_HALF_TOPIC",
"topicSysFlag": 0,
"writeQueueNums": 1
,
"broker-a":
"order": false,
"perm": 7,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "broker-a",
"topicSysFlag": 0,
"writeQueueNums": 1
,
"TBW102":
"order": false,
"perm": 7,
"readQueueNums": 8,
"topicFilterType": "SINGLE_TAG",
"topicName": "TBW102",
"topicSysFlag": 0,
"writeQueueNums": 8
,
"BenchmarkTest":
"order": false,
"perm": 6,
"readQueueNums": 1024,
"topicFilterType": "SINGLE_TAG",
"topicName": "BenchmarkTest",
"topicSysFlag": 0,
"writeQueueNums": 1024
,
"OFFSET_MOVED_EVENT":
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "OFFSET_MOVED_EVENT",
"topicSysFlag": 0,
"writeQueueNums": 1
,
"%RETRY%please_rename_unique_group_name_4":
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "%RETRY%please_rename_unique_group_name_4",
"topicSysFlag": 0,
"writeQueueNums": 1
2.2 FilterServer 配置信息
FilterServer
的配置信息其实是由 FilterServerManager
进行管理的,它其实是以网络 Channel
为 Key,然后以 FilterServerInfo
为 Value 的 Map
形式来进行管理的。下面我们来看一下 FilterServerInfo
的数据结构。
filterServerAddr
:过滤服务的地址lastUpdateTimestamp
:最后一次更新时间
当 Broker 启动向 NameServer 进行注册的时候默认为空
3、NameServer 注册分析
其实从上面的 Topic 配置信息可以看到,其实 Borker 与 NameServer 之间的网络序列化方式是通过 Json。
除了上面提到的 Topic 配置信息以及 FIlterServer 信息这些 body 信息,Broker 注册过来的时候还传递了其它 header 信息。
当 NameServer 把 Broker 传递过来的元信息进行反序列化之后就会通过 RouteInfoManager 对 Broker 元数据进行注册。
下面我们来看一下 RouteInfoManager 这个对象里面的核心字段:
HashMap<String/* topic */, List<QueueData>> topicQueueTable
:消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。HashMap<String/* brokerName */, BrokerData> brokerAddrTable
:broker 名称以及 Broker 数据的映射信息。HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable
:集群名称以及对应的 Broker 名称列表HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable
:broker 地址对应 Broker 通道(Channel) 的对应信息。HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
:broker 地址以及消息的各种过滤机制。
RouteInfoManager#registerBroker
元数据的注册过程其实挺简单的,大家可以自行查看。
以上是关于7RocketMQ 源码解析之 Broker 启动(下)的主要内容,如果未能解决你的问题,请参考以下文章