5RocketMQ 源码解析之 命名服务启动
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5RocketMQ 源码解析之 命名服务启动相关的知识,希望对你有一定的参考价值。
在 RocketMQ 当中,消息发送方以及消息接收方都是配置命名服务(Name Server)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于 Kafka 直接连接 Broker 地址。命名服务的主要功能包含:Broker 管理以及消息的路由管理。具有如下:
Broker 管理
,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着路由管理
,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。
源码分析基于 RocketMQ - 4.9.3
1、NameServer 启动整体流程
从之前 3、RocketMQ 源码解析之 源代码环境搭建 这篇文章中我可以看到命名服务的启动类是:NamesrvStartup
。而 命名服务最重要的类其实是 NamesrvController
,它控制着命名服务的整个流程。命名服务启动其实就是调用NamesrvController
的三个方法的过程:
NamesrvController#<init>
:通过有参构建器传入NamesrvConfig
以及NettyServerConfig
这两个类,创建NamesrvController
对象。NamesrvController#initialize
:调用该方法对NamesrvController
对象进行属性初始化。NamesrvController#start
: 调用该方法完成对 NameServer 的启动,启动NettyRemotingServer
暴露 Netty 服务端的 Socket 服务。
下面我们就来详细的分析一下这三个过程。
2、NamesrvController 对象构建
NamesrvController.init
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig)
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
在 NamesrvController 对象构建里面最核心的还是创建 RouteInfoManager
对象,这个对象里面保存了就完成了 Broker 管理
以及 消息的路由管理
。
下面我们来看一下这个对象里面的核心字段:
HashMap<String/* topic */, List<QueueData>> topicQueueTable
:消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。HashMap<String/* brokerName */, BrokerData> brokerAddrTable
:broker 名称以及 Broker 数据的映射信息。HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable
:集群名称以及对应的 Broker 名称列表HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable
:broker 地址对应 Broker 通道(Channel) 的对应信息。HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
:broker 地址以及消息的各种过滤机制。
这里包含了 Broker 对应的所有元数据信息。Broker 每过一段时间(默认 30 秒)就会向 NameServer 发送心跳,告诉 NameServer 我当前还是存活的。并且如果心跳超过 2 分钟没有发送就会把这个 Broker 从上面的 brokerLiveTable
列表当中移除。
RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker()
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext())
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis())
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
上面就是 Broker 存活检测逻辑。
3、NamesrvController 初始化
下面我们来看一下 的初始化逻辑。
NamesrvController#initialize
public boolean initialize()
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
NamesrvController.this.kvConfigManager.printAllPeriodically();
, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED)
// Register a listener to reload SslContext
try
fileWatchService = new FileWatchService(
new String[]
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
,
new FileWatchService.Listener()
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path)
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath))
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
if (path.equals(TlsSystemConfig.tlsServerCertPath))
certChanged = true;
if (path.equals(TlsSystemConfig.tlsServerKeyPath))
keyChanged = true;
if (certChanged && keyChanged)
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
private void reloadServerSslContext()
((NettyRemotingServer) remotingServer).loadSslContext();
);
catch (Exception e)
log.warn("FileWatchService created error, can't load the certificate dynamically");
return true;
上面的逻辑其实挺简单的,主要的逻辑如下:
kvConfigManager
KV 类型的配置管理器从配置文件中加载配置- 创建 Netty 远程服务
NettyRemotingServer
暴露命名服务 Socket 通信 - 创建一个远程线程池
remotingExecutor
用于异步处理客户端的 Socket 连接请求,默认没 8 个工作线程。 registerProcessor()
:向NettyRemotingServer
远程服务端注册请求处理器(DefaultRequestProcessor
,这个处理器我们稍后在分析),并且使用上一步创建的remotingExecutor
进行异步处理- 使用单个线程池调用
RouteInfoManager#scanNotActiveBroker
扫描并删除 2 分钟没有心跳的 Broker 以及KVConfigManager#printAllPeriodically
打印kvConfigManager
KV 类型的配置管理器里面的配置值。 - 如果服务器开启了 SSL 加密传输就启动
FileWatchService
定时扫描文件更新 SSL
DefaultRequestProcessor
其实就是整个 NameServer 处理 Socket 请求的类.其实最终就是操作 KVConfigManager
这个配置类或者 RouteInfoManager
这个类。
4、NamesrvController 启动
NamesrvController 的启动就相对比较简单了。
public void start() throws Exception
this.remotingServer.start();
if (this.fileWatchService != null)
this.fileWatchService.start();
它的只包括了两个步骤:
NettyRemotingServer#start
,调用 Netty 的服务端 ServerBootstrap 进行服务端启动暴露 Socket 服务,默认是9876
;- 启动
FileWatchService
服务,当 SSL 有更新时,就更新 SSL 配置。
以上是关于5RocketMQ 源码解析之 命名服务启动的主要内容,如果未能解决你的问题,请参考以下文章