7RocketMQ 源码解析之 Broker 启动(下)

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7RocketMQ 源码解析之 Broker 启动(下)相关的知识,希望对你有一定的参考价值。

在前面的一篇文章 – 6、RocketMQ 源码解析之 Broker 启动(上) 分析了一下 Broker 在启动的时候他自身做了哪些事,以及把 Broker 相关的信息注册到 NameServer 中去,下面我们就通过 Broker 把元信息注册到 NameServer 来了解一下 RocketMQ 是如何进行网络间通信的。

1、整体时序图

在分析 Broker 与 NameServer 通信之前,我们首先来看一下大体的时序图。这样就有一个大体的印象。这样在看源码的时候就不会迷失在细节的海洋中。

上面的图就是 Broker 与 NameServer 之间的通信图时序图,我大概解析一下它们的处理过程:

  • 首先在 Broker 启动的时候就会使用 ScheduledExecutorService 每隔 60 秒定时调用 BrokerController#registerBrokerAll 向 NameServer 进行元数据注册。
  • 向 NameServer 主要注册两样东西,一个是 Topic 的配置信息以及 FilterServer。Topic 的配置信息是通过 TopicConfigManager 进行管理的而 FilterServer 是通过 FilterServerManager 进行管理的。
  • 把需要注册的信息构造好然后调用 BrokerOuterAPINameServer 进行注册。在这里服务间通信是通过 Netty 异步调用,所以 Broker 通过 CountDownLatch 异步转成同步等待 NameServer 处理完成注册。关于网络通信过程大家可以查看之前的博客 - 4、RocketMQ 源码解析之 网络通信 Netty
  • NameServer 接收到 Broker 传递过来的请求就会调用 DefaultRequestProcessor#registerBrokerWithFilterServer 进行元数据注册。
  • 首先会请求对象进行解码,然后调用 RouteInfoManager#registerBroker 完成最终的注册。

细节信息我们稍后分析。

2、Broker 注册分析

Broker 注册的时候其实包括两个部分,一个是 Topic 的配置信息,另一个就是 FIlterServer 的信息。

2.1 Topic 配置信息

Topic 的配置信息其实是由 TopicConfigManager 进行管理的,它其实是以 Topic 名称为 Key,然后以 TopicConfig 为 Value 的 Map 形式来进行管理的。下面我们来看一下 TopicConfig 的数据结构。

  • topicName:topic 名称
  • readQueueNums:读列表数,默认 16 个
  • writeQueueNums:写列表数,默认16 个
  • perm:是否可读可写
  • order:是否有序

TopicConfigManager 会创建一些默认的 Topic,包括:

  • SELF_TEST_TOPIC
  • TBW102
  • BenchmarkTest
  • DefaultCluster
  • OFFSET_MOVED_EVENT
  • SCHEDULE_TOPIC_XXXX
  • RMQ_SYS_TRACE_TOPIC

其中最重要的就是 TBW102,只有在 Broker 端开启允许自动创建 Topic 这个主题才会创建(默认允许自动创建)。

下面就是 Broker 启动的时候默认向 NameServer 注册的 Topic 信息:


	"filterServerList": [],
	"topicConfigSerializeWrapper": 
		"dataVersion": 
			"counter": 3,
			"timestamp": 1614867690109
		,
		"topicConfigTable": 
			"SCHEDULE_TOPIC_XXXX": 
				"order": false,
				"perm": 6,
				"readQueueNums": 18,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "SCHEDULE_TOPIC_XXXX",
				"topicSysFlag": 0,
				"writeQueueNums": 18
			,
			"TopicTest": 
				"order": false,
				"perm": 6,
				"readQueueNums": 4,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "TopicTest",
				"topicSysFlag": 0,
				"writeQueueNums": 4
			,
			"SELF_TEST_TOPIC": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "SELF_TEST_TOPIC",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			,
			"DefaultCluster": 
				"order": false,
				"perm": 7,
				"readQueueNums": 16,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "DefaultCluster",
				"topicSysFlag": 0,
				"writeQueueNums": 16
			,
			"DefaultCluster_REPLY_TOPIC": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "DefaultCluster_REPLY_TOPIC",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			,
			"RMQ_SYS_TRANS_HALF_TOPIC": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "RMQ_SYS_TRANS_HALF_TOPIC",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			,
			"broker-a": 
				"order": false,
				"perm": 7,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "broker-a",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			,
			"TBW102": 
				"order": false,
				"perm": 7,
				"readQueueNums": 8,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "TBW102",
				"topicSysFlag": 0,
				"writeQueueNums": 8
			,
			"BenchmarkTest": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1024,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "BenchmarkTest",
				"topicSysFlag": 0,
				"writeQueueNums": 1024
			,
			"OFFSET_MOVED_EVENT": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "OFFSET_MOVED_EVENT",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			,
			"%RETRY%please_rename_unique_group_name_4": 
				"order": false,
				"perm": 6,
				"readQueueNums": 1,
				"topicFilterType": "SINGLE_TAG",
				"topicName": "%RETRY%please_rename_unique_group_name_4",
				"topicSysFlag": 0,
				"writeQueueNums": 1
			
		
	

2.2 FilterServer 配置信息

FilterServer 的配置信息其实是由 FilterServerManager 进行管理的,它其实是以网络 Channel 为 Key,然后以 FilterServerInfo 为 Value 的 Map 形式来进行管理的。下面我们来看一下 FilterServerInfo 的数据结构。

  • filterServerAddr:过滤服务的地址
  • lastUpdateTimestamp:最后一次更新时间

当 Broker 启动向 NameServer 进行注册的时候默认为空

3、NameServer 注册分析

其实从上面的 Topic 配置信息可以看到,其实 Borker 与 NameServer 之间的网络序列化方式是通过 Json。
除了上面提到的 Topic 配置信息以及 FIlterServer 信息这些 body 信息,Broker 注册过来的时候还传递了其它 header 信息。

当 NameServer 把 Broker 传递过来的元信息进行反序列化之后就会通过 RouteInfoManager 对 Broker 元数据进行注册。

下面我们来看一下 RouteInfoManager 这个对象里面的核心字段:

  • HashMap<String/* topic */, List<QueueData>> topicQueueTable :消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。
  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable :broker 名称以及 Broker 数据的映射信息。
  • HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable:集群名称以及对应的 Broker 名称列表
  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable:broker 地址对应 Broker 通道(Channel) 的对应信息。
  • HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable:broker 地址以及消息的各种过滤机制。

RouteInfoManager#registerBroker 元数据的注册过程其实挺简单的,大家可以自行查看。

以上是关于7RocketMQ 源码解析之 Broker 启动(下)的主要内容,如果未能解决你的问题,请参考以下文章

6RocketMQ 源码解析之 Broker 启动(上)

6RocketMQ 源码解析之 Broker 启动(上)

5RocketMQ 源码解析之 命名服务启动

5RocketMQ 源码解析之 命名服务启动

RocketMQ源码—Broker启动流程源码解析一万字

RocketMQ源码系列 broker启动流程源码解析