6RocketMQ 源码解析之 Broker 启动(上)

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了6RocketMQ 源码解析之 Broker 启动(上)相关的知识,希望对你有一定的参考价值。

上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器。并且其它服务在 NameServer 注册服务信息的时候都是全量注册。如果 RocketMQ 的拓扑图当中有多台 NameServer 的时候,只要有一台存活的话 RocketMQ 集群就可以正常工作。


当 RocketMQ 作为消息队列要向外提供服务的时候,除了需要第一步启动 NameServer 用来保存元数据,还需要启动 RocketMQ 的 Broker。这样才能 Producer 才能发现消息到 Broker,然后 Consumer 才能从 Broker 消费消息。

RocketMQ 的 Broker 启动类为:org.apache.rocketmq.broker.BrokerStartup。下面我们来看一下 Broker 启动做了哪些事情。

1、解析启动命令

其实 BrokerStartup 只是一个启动类,Broker 的启动它其实是委托给 BrokerController 这个类来完成的。它的功能其实是非常强大的,要使用这个类来启动 MQ 的整个服务。Rocket 首先要对这个对象进行初始化,做为一个强大的中间件可配置化也是必须的。所以在初始化 BrokerController ,RocketMQ 会先解析我们的启动命令。还记得我们看源码的时候启动 RocketMQ 的启动命令么,如下所示:

-c /Users/carl/projects/idea/github/apache/rocketmq/config/conf/broker.conf -n localhost:9876

我们来看一下 RocketMQ 的 Broker 启动支持哪些命令:

Borker 启动命令列表

短命令长命令描述解释
pprintConfigItemPrint all config item打印所有配置项
cconfigFileBroker config properties fileBroker 的配置文件地址
hhelpPrint help打印帮助命令
mprintImportantConfigPrint important config item打印所有重要的配置项(BrokerConfig类上面所有标注 @ImportantField 注解的属性)
nnamesrvAddrName server address list, eg: 192.168.0.1:9876;192.168.0.2:9876命名服务地址列表

其实最重要的还是 Broker 的配置文件地址的解析,它会把配置文件解析成 Properties 对象,也就是 KV 键值对。然后把这些配置项赋值到以下的 Broker 配置对象当中。

  • BrokerConfig:Broker 服务相关的配置项
  • NettyServerConfig:Broker 对外暴露 Socket 服务相关的配置项(使用 Netty)
  • NettyClientConfig:Broker 对外调用 Socket 服务相关的配置项(使用 Netty)
  • MessageStoreConfig:Broker 消息存储相关的配置项

2、初始化 BrokerController

通过上面解析配置文件并把配置项添加到上面4个配置对象当中,然后调用 BrokerController 的构建函数创建它。

BrokerStartup.java

// 创建 BrokerController 对象
final BrokerController controller = new BrokerController(
    brokerConfig,
    nettyServerConfig,
    nettyClientConfig,
    messageStoreConfig);

下面我们来看一下在创建 BrokerController 做了哪些事。

    public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) 
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        // 消费者 offset 管理器
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        // Broker 的 Topic 管理器
        this.topicConfigManager = new TopicConfigManager(this);
        // 处理 Consumer 消息拉取处理器
        this.pullMessageProcessor = new PullMessageProcessor(this);
        // Consumer 消息拉取长轮训机制
        this.pullRequestHoldService = new PullRequestHoldService(this);
        // 监听收到 Producer 消息(通知 PullRequestHoldService)
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        // 消费者管理器(消费者&消费者组信息)
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        // 消费者消费过滤器
        this.consumerFilterManager = new ConsumerFilterManager(this);
        // 生产者管理器(过期生产者剔除等功能)
        this.producerManager = new ProducerManager();
        // 心跳检测管理器(Producer/Consumer/FilterServer)
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        // broker 与客户端通信类
        this.broker2Client = new Broker2Client(this);
        // consumer 配置管理器
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
        // broker 与外部通信 API(与 NameServer)
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        // Consumer 消息过滤器管理器
        this.filterServerManager = new FilterServerManager(this);
		// 消息主-从同步处理器
        this.slaveSynchronize = new SlaveSynchronize(this);

        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    

上面 Borker 的主要功能类已经注释了。关于上面的功能可以结合 RocketMQ 的 github 地址 的特性结合起来理解。在构建了 BrokerController 对象之后,会调用 BrokerController#initialize 方法对它进行初始化。

BrokerController#initialize

    public boolean initialize() throws CloneNotSupportedException 
        boolean result = this.topicConfigManager.load();

        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

        if (result) 
            try 
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                if (messageStoreConfig.isEnableDLegerCommitLog()) 
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
             catch (IOException e) 
                result = false;
                log.error("Failed to initialize", e);
            
        

        result = result && this.messageStore.load();

        if (result) 
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));

            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));

            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_"));

            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));

            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));

            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));

            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_", true));

            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.endTransactionThreadPoolQueue,
                new ThreadFactoryImpl("EndTransactionThread_"));

            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));

            this.registerProcessor();

            final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    try 
                        BrokerController.this.getBrokerStats().record();
                     catch (Throwable e) 
                        log.error("schedule record error.", e);
                    
                
            , initialDelay, period, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    try 
                        BrokerController.this.consumerOffsetManager.persist();
                     catch (Throwable e) 
                        log.error("schedule persist consumerOffset error.", e);
                    
                
            , 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    try 
                        BrokerController.this.consumerFilterManager.persist();
                     catch (Throwable e) 
                        log.error("schedule persist consumer filter error.", e);
                    
                
            , 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    try 
                        BrokerController.this.protectBroker();
                     catch (Throwable e) 
                        log.error("protectBroker error.", e);
                    
                
            , 3, 3, TimeUnit.MINUTES);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    try 
                        BrokerController.this.printWaterMark();
                     catch (Throwable e) 
                        log.error("printWaterMark error.", e);
                    
                
            , 10, 1, TimeUnit.SECONDS);

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

                @Override
                public void run以上是关于6RocketMQ 源码解析之 Broker 启动(上)的主要内容,如果未能解决你的问题,请参考以下文章

7RocketMQ 源码解析之 Broker 启动(下)

7RocketMQ 源码解析之 Broker 启动(下)

5RocketMQ 源码解析之 命名服务启动

5RocketMQ 源码解析之 命名服务启动

RocketMQ源码—Broker启动流程源码解析一万字

RocketMQ源码系列 broker启动流程源码解析