5RocketMQ 源码解析之 命名服务启动

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了5RocketMQ 源码解析之 命名服务启动相关的知识,希望对你有一定的参考价值。

在 RocketMQ 当中,消息发送方以及消息接收方都是配置命名服务(Name Server)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于 Kafka 直接连接 Broker 地址。命名服务的主要功能包含:Broker 管理以及消息的路由管理。具有如下:

  • Broker 管理,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着
  • 路由管理,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。

源码分析基于 RocketMQ - 4.9.3

1、NameServer 启动整体流程

从之前 3、RocketMQ 源码解析之 源代码环境搭建 这篇文章中我可以看到命名服务的启动类是:NamesrvStartup。而 命名服务最重要的类其实是 NamesrvController,它控制着命名服务的整个流程。命名服务启动其实就是调用NamesrvController的三个方法的过程:

  • NamesrvController#<init>:通过有参构建器传入 NamesrvConfig 以及 NettyServerConfig 这两个类,创建NamesrvController对象。
  • NamesrvController#initialize:调用该方法对 NamesrvController对象进行属性初始化。
  • NamesrvController#start: 调用该方法完成对 NameServer 的启动,启动 NettyRemotingServer 暴露 Netty 服务端的 Socket 服务。

下面我们就来详细的分析一下这三个过程。

2、NamesrvController 对象构建

NamesrvController.init

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) 
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    

在 NamesrvController 对象构建里面最核心的还是创建 RouteInfoManager 对象,这个对象里面保存了就完成了 Broker 管理 以及 消息的路由管理

下面我们来看一下这个对象里面的核心字段:

  • HashMap<String/* topic */, List<QueueData>> topicQueueTable :消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。
  • HashMap<String/* brokerName */, BrokerData> brokerAddrTable :broker 名称以及 Broker 数据的映射信息。
  • HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable:集群名称以及对应的 Broker 名称列表
  • HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable:broker 地址对应 Broker 通道(Channel) 的对应信息。
  • HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable:broker 地址以及消息的各种过滤机制。

这里包含了 Broker 对应的所有元数据信息。Broker 每过一段时间(默认 30 秒)就会向 NameServer 发送心跳,告诉 NameServer 我当前还是存活的。并且如果心跳超过 2 分钟没有发送就会把这个 Broker 从上面的 brokerLiveTable 列表当中移除。

RouteInfoManager#scanNotActiveBroker

    public void scanNotActiveBroker() 
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) 
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) 
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired,  ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            
        
    

上面就是 Broker 存活检测逻辑。

3、NamesrvController 初始化

下面我们来看一下 的初始化逻辑。

NamesrvController#initialize

    public boolean initialize() 

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

            @Override
            public void run() 
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            
        , 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

            @Override
            public void run() 
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            
        , 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) 
            // Register a listener to reload SslContext
            try 
                fileWatchService = new FileWatchService(
                    new String[] 
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    ,
                    new FileWatchService.Listener() 
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) 
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) 
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) 
                                certChanged = true;
                            
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) 
                                keyChanged = true;
                            
                            if (certChanged && keyChanged) 
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            
                        
                        private void reloadServerSslContext() 
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        
                    );
             catch (Exception e) 
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            
        

        return true;
    

上面的逻辑其实挺简单的,主要的逻辑如下:

  • kvConfigManager KV 类型的配置管理器从配置文件中加载配置
  • 创建 Netty 远程服务 NettyRemotingServer 暴露命名服务 Socket 通信
  • 创建一个远程线程池 remotingExecutor 用于异步处理客户端的 Socket 连接请求,默认没 8 个工作线程。
  • registerProcessor():向 NettyRemotingServer 远程服务端注册请求处理器(DefaultRequestProcessor,这个处理器我们稍后在分析),并且使用上一步创建的 remotingExecutor进行异步处理
  • 使用单个线程池调用 RouteInfoManager#scanNotActiveBroker 扫描并删除 2 分钟没有心跳的 Broker 以及 KVConfigManager#printAllPeriodically 打印kvConfigManager KV 类型的配置管理器里面的配置值。
  • 如果服务器开启了 SSL 加密传输就启动 FileWatchService 定时扫描文件更新 SSL

DefaultRequestProcessor 其实就是整个 NameServer 处理 Socket 请求的类.其实最终就是操作 KVConfigManager 这个配置类或者 RouteInfoManager 这个类。

4、NamesrvController 启动

NamesrvController 的启动就相对比较简单了。

    public void start() throws Exception 
        this.remotingServer.start();

        if (this.fileWatchService != null) 
            this.fileWatchService.start();
        
    

它的只包括了两个步骤:

  • NettyRemotingServer#start,调用 Netty 的服务端 ServerBootstrap 进行服务端启动暴露 Socket 服务,默认是 9876;
  • 启动 FileWatchService 服务,当 SSL 有更新时,就更新 SSL 配置。
与50位技术专家面对面 20年技术见证,附赠技术全景图

以上是关于5RocketMQ 源码解析之 命名服务启动的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper源码之服务端启动模块

6RocketMQ 源码解析之 Broker 启动(上)

6RocketMQ 源码解析之 Broker 启动(上)

4Nacos 配置中心源码解析之 服务端启动

4Nacos 配置中心源码解析之 服务端启动

4Nacos 配置中心源码解析之 服务端启动