RocketMQ源码—Broker启动流程源码解析一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Broker启动流程源码解析一万字相关的知识,希望对你有一定的参考价值。
详细介绍了RocketMQ的Broker启动流程源码解析。
此前我们学习了NameServer的启动流程源码:RocketMQ源码(2)—NameServer启动流程源码解析,现在我们来学习Broker的启动流程源码,因为RocketMQ在启动的时候,最先启动NameServer,然后再启动Broker的。
同NameServer源码学习的开头说的那样,我们一样要先学会了如何使用RocketMQ,并且看了官方文档之后再来看源码,那样就能节约很多的时间,并且能够和文档上所讲的对得上号。比如RocketMQ的架构设计,以及Apache RocketMQ开发者指南。
官方介绍:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
-
Remoting Module:整个Broker的实体,负责处理来自Client端的请求。
-
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。
-
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
-
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
-
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
文章目录
- 1 BrokerStartup启动入口
- 2 createBrokerController创建BrokerController
- 3 Start启动BrokerController
- 4 Broker启动流程总结
1 BrokerStartup启动入口
Broker的启动入口就是broker模块的BrokerStartup类的main方法。该方法将会创建并且初始化一个BrokerController实例。看起来和NameServer的流程差不多。
public static void main(String[] args)
//启动broker的入口
//创建并启动一个BrokerController实例
start(createBrokerController(args));
2 createBrokerController创建BrokerController
该方法主要是解析命令行,加载Broker配置,以及NettyServer、NettyClient的各种配置(解析命令行中-c指定的配置文件)并保存起来,然后进行一些配置的校验,日志的配置,随后创建一个BrokerController实例。
BrokerController相当于Broker的一个中央控制器类。创建了BrokerController实例之后,再调用initialize方法进行初始化操作。这是核心方法。
public static BrokerController createBrokerController(String[] args)
//设置RocketMQ的版本信息,设置属性rocketmq.remoting.version,即当前rocketmq版本
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
try
//PackageConflictDetect.detectFastjson();
/*
* 1 jar包启动时,构建命令行操作的指令,使用main方法启动可以忽略
*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
//mqbroker命令文件
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine)
System.exit(-1);
/*
* 2 创建broker的配置类
*/
//创建Broker的配置类,包含Broker的各种配置,比如ROCKETMQ_HOME
final BrokerConfig brokerConfig = new BrokerConfig();
//NettyServer的配置类,Broker作为服务端,比如接收来自客户端的消息的时候
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//NettyClient的配置类,Broker还会作为客户端,比如连接NameServer的时候
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
// tls安全相关配置
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//设置作为NettyServer时的监听端口为10911
nettyServerConfig.setListenPort(10911);
//Broker的消息存储配置,例如各种文件大小等
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//如果broker的角色是slave,设置命中消息在内存的最大比例
//默认broker角色是异步master
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole())
//30
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
/*
* 3 解析外部配置文件
* 判断命令行中是否包含字符'c',即是否包含通过命令行指定配置文件的命令
* 例如,启动Broker的时候添加的 -c /Volumes/Samsung/Idea/rocketmq/config/conf/broker.conf命令
*/
if (commandLine.hasOption('c'))
//获取该命令指定的配置文件
String file = commandLine.getOptionValue('c');
if (file != null)
//加载外部配置文件
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//将rmqAddressServerDomain、rmqAddressServerSubGroup属性设置为系统属性
properties2SystemEnv(properties);
//设置broker的配置信息
MixAll.properties2Object(properties, brokerConfig);
//设置nettyServer的配置信息
MixAll.properties2Object(properties, nettyServerConfig);
//设置nettyClient的配置信息
MixAll.properties2Object(properties, nettyClientConfig);
//设置messageStore的配置信息
MixAll.properties2Object(properties, messageStoreConfig);
//设置配置文件路径
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
//设置broker的配置信息
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
//如果不存在ROCKETMQ_HOME的配置,那么打印异常并退出程序
if (null == brokerConfig.getRocketmqHome())
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
/*
* 4 获取namesrvAddr,即NameServer的地址,并进行校验
*/
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr)
try
//拆分NameServer的地址
//可以指定多个NameServer的地址,以";"分隔
String[] addrArray = namesrvAddr.split(";");
//将字符串的地址,转换为网络连接的SocketAddress,检测格式是否正确
for (String addr : addrArray)
RemotingUtil.string2SocketAddress(addr);
catch (Exception e)
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \\"127.0.0.1:9876;192.168.0.1:9876\\"%n",
namesrvAddr);
System.exit(-3);
/*
* 4 设置、校验brokerId
*
* 根据broker的角色配置brokerId,默认角色是ASYNC_MASTER
* 通过此配置可知BrokerId为0表示Master,非0表示Slave
*/
switch (messageStoreConfig.getBrokerRole())
case ASYNC_MASTER:
case SYNC_MASTER:
//如果是master角色,那么设置brokerId为0
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
//如果是slave角色,需要brokerId大于0
if (brokerConfig.getBrokerId() <= 0)
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
break;
default:
break;
// 开启 DLeger 的操作
if (messageStoreConfig.isEnableDLegerCommitLog())
brokerConfig.setBrokerId(-1);
/*
* 5 设置高可用通信监听端口,为监听端口+1,默认就是10912
* 该端口主要用于比如主从同步之类的高可用操作
*
* 在配置broker集群的时候需要注意,配置集群时可能会抛出:Address already in use
* 因为一个broker机器会占用三个端口,监听ip端口,以及监听ip端口+1的端口,监听ip端口-2端口
*/
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
/*
* 6 日志相关配置
*/
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
//Joran 是 logback 使用的一个配置加载库,可以直接调用JoranConfigurator类重新实现logback的配置机制,
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
//配置broker日志文件的路徑
System.setProperty("brokerLogDir", "");
//isolateLogEnable属性表示在同一台机器上部署多个broker时是否区分日志路径,默認false
if (brokerConfig.isIsolateLogEnable())
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + brokerConfig.getBrokerId());
if (brokerConfig.isIsolateLogEnable() && messageStoreConfig.isEnableDLegerCommitLog())
System.setProperty("brokerLogDir", brokerConfig.getBrokerName() + "_" + messageStoreConfig.getdLegerSelfId());
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
/*判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管*/
if (commandLine.hasOption('p'))
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
else if (commandLine.hasOption('m'))
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
//打印当前broker的配置日志
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
/*
* 7 实例化BrokerController,设置各种属性
*/
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// 将所有的-c的外部配置信息保存到NamesrvController中的Configuration对象属性的allConfigs属性中
controller.getConfiguration().registerConfig(properties);
/*
* 8 初始化BrokerController
* 创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。
*/
boolean initResult = controller.initialize();
//初始化失败则退出
if (!initResult)
controller.shutdown();
System.exit(-3);
/*
* 9 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable()
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run()
synchronized (this)
log.info("Shutdown hook was invoked, ", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown)
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
//执行controller的shutdown方法,并且还会在messageStore#shutdown方法中将abort临时文件删除。
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): ", consumingTimeTotal);
, "ShutdownHook"));
//返回BrokerController
return controller;
catch (Throwable e)
e.printStackTrace();
System.exit(-1);
return null;
2.1 创建各种配置类
createBrokerController方法中,首先就会创建各种配置类。重要的broker的各种配置类如下:
-
BrokerConfig:Broker的配置类,包含Broker的各种配置,比如ROCKETMQ_HOME、namesrvAddr、brokerName、brokerId等属性。
-
NettyServerConfig:NettyServer的配置类,包含Broker作为服务端时的各种属性,比如客户端进行交互的时候。设置作为NettyServer时的监听端口为10911。即客户端与Broker通信时使用10911端口。
-
NettyClientConfig:NettyClient的配置类,包含Broker作为客户端时的各种属性,Broker还会作为客户端,比如与NameServer交互的时候。
-
MessageStoreConfig:Broker消息存储的配置类,包含了消息存储的相关配置。比如各种文件的目录、大小等信息。
然后还会将-c指令(c即configFile)指定的外部配置文件中的属性设置给这些配置类,我们可以通过在启动时追加类似于“-c xx/xx/xx/配置文件目录”的指令指定外部配置文件。
常见的外部配置文件配置的内容及其含义如下:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#删除文件的时间点,一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。
deleteWhen = 04
#文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除,单位小时
fileReservedTime = 48
#broker的角色,默认是异步master,即生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。
#同步master:Sync Broker:生产者发送的每一条消息都至少同步复制到一个slave后才返回告诉生产者成功,即“同步双写”。
brokerRole = ASYNC_MASTER
#消息刷盘策略,默认是异步刷盘。
#异步刷盘ASYNC_FLUSH:生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:
#1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行自动刷盘。异步刷盘有较低概率导致消息丢失,比如在还未来得及同步到磁盘的时候宕机,但是性能更好。
#同步刷盘SYNC_FLUSH:生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问题,但是有很大的磁盘IO开销,性能有一定影响。
flushDiskType = ASYNC_FLUSH
#nameserver的地址,也可以指定真实ip
namesrvAddr=127.0.0.1:9876
#brokerIp,也可以指定真实ip
brokerIP1=127.0.0.1
#消息存储根路径
storePathRootDir=F:/Idea/rocketmq/config/store
#commitLog文件的存储路径
storePathCommitLog=F:/Idea/rocketmq/config/store/commitlog
#consume queue文件的存储路径
storePathConsumeQueue=F:/Idea/rocketmq/config/store/consumequeue
#消息索引文件的存储路径
storePathIndex=F:/Idea/rocketmq/config/store/index
#checkpoint文件的存储路径
storeCheckpoint=F:/Idea/rocketmq/config/store/checkpoint
#abort文件的存储路径
abortFile=F:/Idea/rocketmq/config/store/abort
设置了配置信息之后,会及进行一系列的校验,例如:
-
ROCKETMQ_HOME校验:如果在启动参数中没有指定ROCKETMQ_HOME属性,那么打印异常并退出程序。ROCKETMQ_HOME就是指定RocketMQ的配置文件路径。
-
namesrvAddr校验:我们可以配置多个Name server 地址,以“;”分割,这里Broker会通过将各个Name server 的字符串地址转换为InetSocketAddress来校验各个地址的合法性。
-
设置、校验brokerId:如果broker是同步master或者异步master角色,则设置brokerId为0,如果是slave角色,则校验设置的brokerId如果不大于0,则打印异常,并推出程序。
2.2 创建broker控制器
在设置了创建了各种配置类并且解析设置了配置文件中的属性之后。将会根据BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig配置类这些配置类调用BrokerController的构造器创建一个BrokerController实例。
BrokerController的构造器实际上同样是在进行一系列的赋值和初始化操作,创建各种manager、queue等等各种组件。我们说过BrokerController相当于broker的一个中央控制器,各种组件角色之间的交互都是通过BrokerController来完成的,而不是组件的直接互相调用。
从下面的源码可以看出,实例化BrokerController的时候,会一并实例化很多的配置类和线程池队列。
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
)
//broker的配置
this.brokerConfig = brokerConfig;
//作为netty服务端与客户端交互的配置
this.nettyServerConfig = nettyServerConfig;
//作为netty客户端与服务端交互的配置
this.nettyClientConfig = nettyClientConfig;
//消息存储的配置
this.messageStoreConfig = messageStoreConfig;
//消费者偏移量管理器,维护offset进度信息
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
//topic配置管理器,管理broker中存储的所有topic的配置
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
//拉取消息处理器,用于处理拉取消息的请求
this.pullMessageProcessor = new PullMessageProcessor(this);
//拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
//消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldService
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
//消费者id变化监听器
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
//消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
//消费者过滤管理器,配置文件为:xx/config/consumerFilter.json以上是关于RocketMQ源码—Broker启动流程源码解析一万字的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字