RocketMQ源码—NameServer启动流程源码解析
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—NameServer启动流程源码解析相关的知识,希望对你有一定的参考价值。
详细介绍了RocketMQ的NameServer启动流程源码解析,包括RocketMQ的RPC通信模型。
文章目录
- 0 NameServer概述
- 1 NamesrvStartup启动入口
- 2 createNamesrvController创建NamesrvController
- 3 start启动NamesrvController
- 4 RocaktMQ的RPC通信模型设计初探
- 5 NameServer启动流程总结
0 NameServer概述
我们要先学会了如何使用RocketMQ,并且看了官方文档之后再来看源码,那样就能节约很多的时间。比如RocketMQ的架构设计,以及Apache RocketMQ开发者指南。
下面是RocketMQ的架构设计图:
在官方文档中可以得知,NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
-
Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
-
路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
从上面可以得知,无论是Producer、Conumser、Broker都会直接和NameServer进行通信,实际上NameServer是非常重要的角色。但是由于大多数程序员都是一个“使用者”的角色,而使用RocketMQ的API的时候一般也不会接触到NameServer,因此对于NameServer的了解较少。更多的NameServer的特性需要查看官方文档。
实际上RocketMQ在部署启动时,会首先启动NameServer,因此本系列的源码分析文章中,将会以NameServer的启动作为入口,一步步的向后分析Broker、Consumer、Producer的核心源码。
1 NamesrvStartup启动入口
NameServer的启动入口就是namesrv模块的NamesrvStartup类的main方法。该方法将会创建并且初始化一个NamesrvController实例。
public static void main(String[] args)
main0(args);
public static NamesrvController main0(String[] args)
try
/*
* 1、创建NamesrvController
*/
NamesrvController controller = createNamesrvController(args);
/*
* 2、启动NamesrvController
*/
start(controller);
/*
* 启动成功打印日志
*/
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
catch (Throwable e)
e.printStackTrace();
System.exit(-1);
return null;
可以看到入口方法还是比较简单的,我们主要看createNamesrvController创建以及start启动NamesrvController的两个方法的源码。
2 createNamesrvController创建NamesrvController
该方法主要是解析命令行,加载NameServer配置和NettyServer各种配置(解析命令行中-c指定的配置文件)并保存起来然后创建一个NamesrvController。NamesrvController相当于NameServer的一个中央控制器类。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException
//设置RocketMQ的版本信息,属性名为rocketmq.remoting.version
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
/*jar包启动时,构建命令行操作的指令,使用main方法启动可以忽略*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
//mqnamesrv命令文件
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine)
System.exit(-1);
return null;
//创建NameServer的配置类,包含NameServer的配置,比如ROCKETMQ_HOME
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//和NettyServer的配置类
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//netty服务的监听端口设置为9876
nettyServerConfig.setListenPort(9876);
//判断命令行中是否包含字符'c',即是否包含通过命令行指定配置文件的命令
//例如,启动Broker的时候添加的 -c /Volumes/Samsung/Idea/rocketmq/config/conf/broker.conf命令
if (commandLine.hasOption('c'))
/*解析配置文件并且存入NamesrvConfig和NettyServerConfig中,没有的话就不用管*/
String file = commandLine.getOptionValue('c');
if (file != null)
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
/*判断命令行中是否包含字符'p',如果存在则打印配置信息并结束jvm运行,没有的话就不用管*/
if (commandLine.hasOption('p'))
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
//把命令行的配置解析到namesrvConfig
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//如果不存在ROCKETMQ_HOME的配置,那么打印异常并退出程序,这就是最开始启动NameServer是抛出异常的位置
if (null == namesrvConfig.getRocketmqHome())
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
/*一系列日志的配置*/
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
//打印nameServer 服务器配置类和 netty 服务器配置类的配置信息
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
/*
* 根据namesrvConfig和nettyServerConfig创建NamesrvController
*/
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 将所有的-c的外部配置信息保存到NamesrvController中的Configuration对象属性的allConfigs属性中
controller.getConfiguration().registerConfig(properties);
return controller;
2.1 new NamesrvController创建控制器
createNamesrvController方法中,会根据NamesrvConfig和NettyServerConfig调用NamesrvController的构造器创建一个实例。
这个构造器源码也比较简单,就是初始化一些属性。
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig)
//nameserver的配置
this.namesrvConfig = namesrvConfig;
//nameserver的netty服务的配置
this.nettyServerConfig = nettyServerConfig;
//kv配置管理器
this.kvConfigManager = new KVConfigManager(this);
//路由信息管理器
this.routeInfoManager = new RouteInfoManager();
//Broker连接的各种事件的处理服务,是处理Broker连接发生变化的服务
//主要用于监听在Channel通道关闭事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
//配置类,并将namesrvConfig和nettyServerConfig的配置注册到内部的allConfigs集合中
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
//存储路径配置
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
这里面还会初始化一个brokerHousekeepingService,他是一个ChannelEventListener的实现,主要用于主要用于监听Broker的Channel通道关闭事件,并在事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息。
public class BrokerHousekeepingService implements ChannelEventListener
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController)
this.namesrvController = namesrvController;
//连接事件,不处理
@Override
public void onChannelConnect(String remoteAddr, Channel channel)
//连接关闭事件
@Override
public void onChannelClose(String remoteAddr, Channel channel)
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
//连接异常事件
@Override
public void onChannelException(String remoteAddr, Channel channel)
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
//连接闲置事件
@Override
public void onChannelIdle(String remoteAddr, Channel channel)
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
3 start启动NamesrvController
在创建NamesrvController之后,调用start方法对其进行启动,实际上就是启动NameServer中的NettyServer服务。
该方法主要做了三件事:
-
调用initialize方法初始化NettyServer。创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。
-
对JVM添加关闭钩子方法,在NameServer的JVM关闭之前执行,关闭NameServerController中线程池,NettyServer进行关闭进行一些内存清理、对象销毁等操作。
-
调用start方法启动NettyServer,并进行监听。
public static NamesrvController start(final NamesrvController controller) throws Exception
//不能为null
if (null == controller)
throw new IllegalArgumentException("NamesrvController is null");
/*
* 1 初始化NettyServer
* 创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。
*/
boolean initResult = controller.initialize();
//初始化失败则退出程序
if (!initResult)
controller.shutdown();
System.exit(-3);
/*
* 2 添加关闭钩子方法,在NameServer关闭之前执行,进行一些内存清理、对象销毁等操作
*/
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>()
@Override
public Void call() throws Exception
controller.shutdown();
return null;
));
/*
* 3 启动NettyServer,并进行监听
*/
controller.start();
return controller;
3.1 initialize初始化NettyServer
该方法用于初始化NettyServer。将会执行创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等初始化操作。
initialize的大概步骤为:
-
加载KV配置并存储到kvConfigManager内部的configTable属性中。
-
创建NameServer的netty远程服务。remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。
-
创建netty远程通信执行器线程池remotingExecutor,线程数默认8,线程名以RemotingExecutorThread_为前缀,用作默认的请求处理线程池。
-
注册默认请求处理器DefaultRequestProcessor到remotingServer中。
-
启动两个定时任务。其中一个每隔十秒钟检测不活跃的Broker并清理相关路由信息,这是一个核心知识点,另一个任务则是每隔十分钟打印kv配置信息。
public boolean initialize()
/*
* 1 加载KV配置并存储到kvConfigManager内部的configTable属性中
* KVConfig配置文件默认路径是 $user.home/namesrv/kvConfig.json
*/
this.kvConfigManager.load();
/*
* 2 创建NameServer的netty远程服务
* 设置了一个ChannelEventListener,为此前创建brokerHousekeepingService
* remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端
*/
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
/*
* 3 创建netty远程通信执行器线程池,用作默认的请求处理线程池,线程名以RemotingExecutorThread_为前缀
*/
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
/*
* 4 注册默认请求处理器DefaultRequestProcessor
* 将remotingExecutor绑定到DefaultRequestProcessor上,用作默认的请求处理线程池
* DefaultRequestProcessor绑定到remotingServer的defaultRequestProcessor属性上
*/
this.registerProcessor();
/*
* 5 启动一个定时任务
* 首次启动延迟5秒执行,此后每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
//扫描notActive的broker
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
, 5, 10, TimeUnit.SECONDS);
/*
* 6 启动一个定时任务
* 首次启动延迟1分钟执行,此后每隔10分钟执行一次打印kv配置信息的任务
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
//打印kv配置信息
NamesrvController.this.kvConfigManager.printAllPeriodically();
, 1, 10, TimeUnit.MINUTES);
/*
* Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
*/
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED)
// Register a listener to reload SslContext
try
fileWatchService = new FileWatchService(
new String[]
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
,
new FileWatchService.Listener()
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path)
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath))
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
if (path.equals(TlsSystemConfig.tlsServerCertPath))
certChanged = true;
if (path.equals(TlsSystemConfig.tlsServerKeyPath))
keyChanged = true;
if (certChanged && keyChanged)
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
private void reloadServerSslContext()
((NettyRemotingServer) remotingServer).loadSslContext();
);
catch (Exception e)
log.warn("FileWatchService created error, can't load the certificate dynamically");
return true;
3.1.1 创建NettyRemotingServer
initialize方法的第一步是创建一个NettyRemotingServer,remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。
NettyRemotingServer的构造器主要做了以下事:
-
创建serverBootstrap,这是etty服务端启动类,引导启动服务端。
-
创建一个公共线程池publicExecutor,线程数默认4个线程,线程名以NettyServerPublicExecutor_为前缀。用在registerProcessor方法中,在该方法注册Netty事件处理器时如果没指定线程池,则会统一使用publicExecutor来处理具体的业务,用于处理某些特定的请求业务,例如异步发送消息的回调。
-
根据是否使用epoll模型初始化Boss EventLoopGroup和Worker EventLoopGroup这两个事件循环组,线程数分别默认1个和3个线程,线程名分别以NettyEPOLLBoss_和NettyServerEPOLLSelector_为前缀。这两个线程组对于熟悉Netty的同学应该不陌生了,boss用于处理连接事件,worker用于处理读写事件。
- 如果是linux内核,并且指定开启epoll,并且系统支持epoll,才会使用EpollEventLoopGroup类型,否则使用NioEventLoopGroup类型。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener)
//设置服务器单向、异步发送信号量
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
//创建Netty服务端启动类,引导启动服务端
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
//服务器回调执行线程数量,默认设置为4
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0)
publicThreadNums = 4;
//创建一个公共线程池,负责处理某些请求业务,例如发送异步消息回调,线程名以NettyServerPublicExecutor_为前缀
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory()
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThreadRocketMQ源码系列 NameServer 核心源码解析