6RocketMQ 源码解析之 Broker 启动(上)
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6RocketMQ 源码解析之 Broker 启动(上)相关的知识,希望对你有一定的参考价值。
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器。并且其它服务在 NameServer 注册服务信息的时候都是全量注册。如果 RocketMQ 的拓扑图当中有多台 NameServer 的时候,只要有一台存活的话 RocketMQ 集群就可以正常工作。
当 RocketMQ 作为消息队列要向外提供服务的时候,除了需要第一步启动 NameServer 用来保存元数据,还需要启动 RocketMQ 的 Broker。这样才能 Producer 才能发现消息到 Broker,然后 Consumer 才能从 Broker 消费消息。
RocketMQ 的 Broker 启动类为:org.apache.rocketmq.broker.BrokerStartup
。下面我们来看一下 Broker 启动做了哪些事情。
1、解析启动命令
其实 BrokerStartup
只是一个启动类,Broker 的启动它其实是委托给 BrokerController
这个类来完成的。它的功能其实是非常强大的,要使用这个类来启动 MQ 的整个服务。Rocket 首先要对这个对象进行初始化,做为一个强大的中间件可配置化也是必须的。所以在初始化 BrokerController
,RocketMQ 会先解析我们的启动命令。还记得我们看源码的时候启动 RocketMQ 的启动命令么,如下所示:
-c /Users/carl/projects/idea/github/apache/rocketmq/config/conf/broker.conf -n localhost:9876
我们来看一下 RocketMQ 的 Broker 启动支持哪些命令:
Borker 启动命令列表
短命令 | 长命令 | 描述 | 解释 |
---|---|---|---|
p | printConfigItem | Print all config item | 打印所有配置项 |
c | configFile | Broker config properties file | Broker 的配置文件地址 |
h | help | Print help | 打印帮助命令 |
m | printImportantConfig | Print important config item | 打印所有重要的配置项(BrokerConfig类上面所有标注 @ImportantField 注解的属性) |
n | namesrvAddr | Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 | 命名服务地址列表 |
其实最重要的还是 Broker 的配置文件地址的解析,它会把配置文件解析成 Properties 对象,也就是 KV 键值对。然后把这些配置项赋值到以下的 Broker 配置对象当中。
BrokerConfig
:Broker 服务相关的配置项NettyServerConfig
:Broker 对外暴露 Socket 服务相关的配置项(使用 Netty)NettyClientConfig
:Broker 对外调用 Socket 服务相关的配置项(使用 Netty)MessageStoreConfig
:Broker 消息存储相关的配置项
2、初始化 BrokerController
通过上面解析配置文件并把配置项添加到上面4个配置对象当中,然后调用 BrokerController 的构建函数创建它。
BrokerStartup.java
// 创建 BrokerController 对象
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
下面我们来看一下在创建 BrokerController
做了哪些事。
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
)
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
// 消费者 offset 管理器
this.consumerOffsetManager = new ConsumerOffsetManager(this);
// Broker 的 Topic 管理器
this.topicConfigManager = new TopicConfigManager(this);
// 处理 Consumer 消息拉取处理器
this.pullMessageProcessor = new PullMessageProcessor(this);
// Consumer 消息拉取长轮训机制
this.pullRequestHoldService = new PullRequestHoldService(this);
// 监听收到 Producer 消息(通知 PullRequestHoldService)
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
// 消费者管理器(消费者&消费者组信息)
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
// 消费者消费过滤器
this.consumerFilterManager = new ConsumerFilterManager(this);
// 生产者管理器(过期生产者剔除等功能)
this.producerManager = new ProducerManager();
// 心跳检测管理器(Producer/Consumer/FilterServer)
this.clientHousekeepingService = new ClientHousekeepingService(this);
// broker 与客户端通信类
this.broker2Client = new Broker2Client(this);
// consumer 配置管理器
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
// broker 与外部通信 API(与 NameServer)
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
// Consumer 消息过滤器管理器
this.filterServerManager = new FilterServerManager(this);
// 消息主-从同步处理器
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
上面 Borker 的主要功能类已经注释了。关于上面的功能可以结合 RocketMQ 的 github 地址 的特性结合起来理解。在构建了 BrokerController 对象之后,会调用 BrokerController#initialize
方法对它进行初始化。
BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
if (result)
try
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog())
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
catch (IOException e)
result = false;
log.error("Failed to initialize", e);
result = result && this.messageStore.load();
if (result)
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
this.registerProcessor();
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.getBrokerStats().record();
catch (Throwable e)
log.error("schedule record error.", e);
, initialDelay, period, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.consumerOffsetManager.persist();
catch (Throwable e)
log.error("schedule persist consumerOffset error.", e);
, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.consumerFilterManager.persist();
catch (Throwable e)
log.error("schedule persist consumer filter error.", e);
, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.protectBroker();
catch (Throwable e)
log.error("protectBroker error.", e);
, 3, 3, TimeUnit.MINUTES);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.printWaterMark();
catch (Throwable e)
log.error("printWaterMark error.", e);
, 10, 1, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run以上是关于6RocketMQ 源码解析之 Broker 启动(上)的主要内容,如果未能解决你的问题,请参考以下文章