RocketMQ源码—NameServer启动流程源码解析

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—NameServer启动流程源码解析相关的知识,希望对你有一定的参考价值。

详细介绍了RocketMQ的NameServer启动流程源码解析,包括RocketMQ的RPC通信模型。

文章目录

0 NameServer概述

我们要先学会了如何使用RocketMQ,并且看了官方文档之后再来看源码,那样就能节约很多的时间。比如RocketMQ的架构设计,以及Apache RocketMQ开发者指南

下面是RocketMQ的架构设计图:

官方文档中可以得知,NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:

  1. Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;

  2. 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

从上面可以得知,无论是Producer、Conumser、Broker都会直接和NameServer进行通信,实际上NameServer是非常重要的角色。但是由于大多数程序员都是一个“使用者”的角色,而使用RocketMQ的API的时候一般也不会接触到NameServer,因此对于NameServer的了解较少。更多的NameServer的特性需要查看官方文档

实际上RocketMQ在部署启动时,会首先启动NameServer,因此本系列的源码分析文章中,将会以NameServer的启动作为入口,一步步的向后分析Broker、Consumer、Producer的核心源码。

1 NamesrvStartup启动入口

NameServer的启动入口就是namesrv模块的NamesrvStartup类的main方法。该方法将会创建并且初始化一个NamesrvController实例。

public static void main(String[] args) 
     main0(args);
 

 public static NamesrvController main0(String[] args) 

     try 
         /*
          * 1、创建NamesrvController
          */
         NamesrvController controller = createNamesrvController(args);
/*
 * 2、启动NamesrvController
 */
         start(controller);
/*
 * 启动成功打印日志
 */
         String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
         log.info(tip);
         System.out.printf("%s%n", tip);
         return controller;
      catch (Throwable e) 
         e.printStackTrace();
         System.exit(-1);
     

     return null;
 

可以看到入口方法还是比较简单的,我们主要看createNamesrvController创建以及start启动NamesrvController的两个方法的源码。

2 createNamesrvController创建NamesrvController

该方法主要是解析命令行,加载NameServer配置和NettyServer各种配置(解析命令行中-c指定的配置文件)并保存起来然后创建一个NamesrvController。NamesrvController相当于NameServer的一个中央控制器类。

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException 
    //设置RocketMQ的版本信息,属性名为rocketmq.remoting.version
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
    /*jar包启动时,构建命令行操作的指令,使用main方法启动可以忽略*/
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    //mqnamesrv命令文件
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) 
        System.exit(-1);
        return null;
    
    //创建NameServer的配置类,包含NameServer的配置,比如ROCKETMQ_HOME
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //和NettyServer的配置类
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //netty服务的监听端口设置为9876
    nettyServerConfig.setListenPort(9876);
    //判断命令行中是否包含字符'c',即是否包含通过命令行指定配置文件的命令
    //例如,启动Broker的时候添加的 -c /Volumes/Samsung/Idea/rocketmq/config/conf/broker.conf命令
    if (commandLine.hasOption('c')) 
        /*解析配置文件并且存入NamesrvConfig和NettyServerConfig中,没有的话就不用管*/
        String file = commandLine.getOptionValue('c');
        if (file != null) 
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        
    
    /*判断命令行中是否包含字符'p',如果存在则打印配置信息并结束jvm运行,没有的话就不用管*/
    if (commandLine.hasOption('p')) 
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    
    //把命令行的配置解析到namesrvConfig
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    //如果不存在ROCKETMQ_HOME的配置,那么打印异常并退出程序,这就是最开始启动NameServer是抛出异常的位置
    if (null == namesrvConfig.getRocketmqHome()) 
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    
    /*一系列日志的配置*/
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //打印nameServer 服务器配置类和 netty 服务器配置类的配置信息
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
    /*
     * 根据namesrvConfig和nettyServerConfig创建NamesrvController
     */
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // 将所有的-c的外部配置信息保存到NamesrvController中的Configuration对象属性的allConfigs属性中
    controller.getConfiguration().registerConfig(properties);

    return controller;

2.1 new NamesrvController创建控制器

createNamesrvController方法中,会根据NamesrvConfig和NettyServerConfig调用NamesrvController的构造器创建一个实例。

这个构造器源码也比较简单,就是初始化一些属性。

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) 
    //nameserver的配置
    this.namesrvConfig = namesrvConfig;
    //nameserver的netty服务的配置
    this.nettyServerConfig = nettyServerConfig;
    //kv配置管理器
    this.kvConfigManager = new KVConfigManager(this);
    //路由信息管理器
    this.routeInfoManager = new RouteInfoManager();
    //Broker连接的各种事件的处理服务,是处理Broker连接发生变化的服务
    //主要用于监听在Channel通道关闭事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    //配置类,并将namesrvConfig和nettyServerConfig的配置注册到内部的allConfigs集合中
    this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
    //存储路径配置
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");

这里面还会初始化一个brokerHousekeepingService,他是一个ChannelEventListener的实现,主要用于主要用于监听Broker的Channel通道关闭事件,并在事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息。

public class BrokerHousekeepingService implements ChannelEventListener 
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final NamesrvController namesrvController;

    public BrokerHousekeepingService(NamesrvController namesrvController) 
        this.namesrvController = namesrvController;
    
    //连接事件,不处理
    @Override
    public void onChannelConnect(String remoteAddr, Channel channel) 
    
    //连接关闭事件
    @Override
    public void onChannelClose(String remoteAddr, Channel channel) 
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    
    //连接异常事件
    @Override
    public void onChannelException(String remoteAddr, Channel channel) 
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    
    //连接闲置事件
    @Override
    public void onChannelIdle(String remoteAddr, Channel channel) 
        this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
    

3 start启动NamesrvController

在创建NamesrvController之后,调用start方法对其进行启动,实际上就是启动NameServer中的NettyServer服务。

该方法主要做了三件事:

  1. 调用initialize方法初始化NettyServer。创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。

  2. 对JVM添加关闭钩子方法,在NameServer的JVM关闭之前执行,关闭NameServerController中线程池,NettyServer进行关闭进行一些内存清理、对象销毁等操作。

  3. 调用start方法启动NettyServer,并进行监听。

public static NamesrvController start(final NamesrvController controller) throws Exception 
    //不能为null
    if (null == controller) 
        throw new IllegalArgumentException("NamesrvController is null");
    
    /*
     * 1 初始化NettyServer
     * 创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。
     */
    boolean initResult = controller.initialize();
    //初始化失败则退出程序
    if (!initResult) 
        controller.shutdown();
        System.exit(-3);
    
    /*
     * 2 添加关闭钩子方法,在NameServer关闭之前执行,进行一些内存清理、对象销毁等操作
     */
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() 
        @Override
        public Void call() throws Exception 
            controller.shutdown();
            return null;
        
    ));
    /*
     * 3 启动NettyServer,并进行监听
     */
    controller.start();

    return controller;

3.1 initialize初始化NettyServer

该方法用于初始化NettyServer。将会执行创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等初始化操作。

initialize的大概步骤为:

  1. 加载KV配置并存储到kvConfigManager内部的configTable属性中。

  2. 创建NameServer的netty远程服务。remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。

  3. 创建netty远程通信执行器线程池remotingExecutor,线程数默认8,线程名以RemotingExecutorThread_为前缀,用作默认的请求处理线程池。

  4. 注册默认请求处理器DefaultRequestProcessor到remotingServer中。

  5. 启动两个定时任务。其中一个每隔十秒钟检测不活跃的Broker并清理相关路由信息,这是一个核心知识点,另一个任务则是每隔十分钟打印kv配置信息。

public boolean initialize() 
    /*
     * 1 加载KV配置并存储到kvConfigManager内部的configTable属性中
     * KVConfig配置文件默认路径是 $user.home/namesrv/kvConfig.json
     */
    this.kvConfigManager.load();
    /*
     * 2 创建NameServer的netty远程服务
     * 设置了一个ChannelEventListener,为此前创建brokerHousekeepingService
     * remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端
     */
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    /*
     * 3 创建netty远程通信执行器线程池,用作默认的请求处理线程池,线程名以RemotingExecutorThread_为前缀
     */
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    /*
     * 4 注册默认请求处理器DefaultRequestProcessor
     * 将remotingExecutor绑定到DefaultRequestProcessor上,用作默认的请求处理线程池
     * DefaultRequestProcessor绑定到remotingServer的defaultRequestProcessor属性上
     */
    this.registerProcessor();
    /*
     * 5 启动一个定时任务
     * 首次启动延迟5秒执行,此后每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

        @Override
        public void run() 
            //扫描notActive的broker
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        
    , 5, 10, TimeUnit.SECONDS);
    /*
     * 6 启动一个定时任务
     * 首次启动延迟1分钟执行,此后每隔10分钟执行一次打印kv配置信息的任务
     */
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

        @Override
        public void run() 
            //打印kv配置信息
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        
    , 1, 10, TimeUnit.MINUTES);
    /*
     * Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
     */
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) 
        // Register a listener to reload SslContext
        try 
            fileWatchService = new FileWatchService(
                new String[] 
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                ,
                new FileWatchService.Listener() 
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) 
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) 
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) 
                            certChanged = true;
                        
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) 
                            keyChanged = true;
                        
                        if (certChanged && keyChanged) 
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        
                    
                    private void reloadServerSslContext() 
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    
                );
         catch (Exception e) 
            log.warn("FileWatchService created error, can't load the certificate dynamically");
        
    

    return true;

3.1.1 创建NettyRemotingServer

initialize方法的第一步是创建一个NettyRemotingServer,remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。

NettyRemotingServer的构造器主要做了以下事:

  1. 创建serverBootstrap,这是etty服务端启动类,引导启动服务端。

  2. 创建一个公共线程池publicExecutor,线程数默认4个线程,线程名以NettyServerPublicExecutor_为前缀。用在registerProcessor方法中,在该方法注册Netty事件处理器时如果没指定线程池,则会统一使用publicExecutor来处理具体的业务,用于处理某些特定的请求业务,例如异步发送消息的回调。

  3. 根据是否使用epoll模型初始化Boss EventLoopGroup和Worker EventLoopGroup这两个事件循环组,线程数分别默认1个和3个线程,线程名分别以NettyEPOLLBoss_和NettyServerEPOLLSelector_为前缀。这两个线程组对于熟悉Netty的同学应该不陌生了,boss用于处理连接事件,worker用于处理读写事件。

    1. 如果是linux内核,并且指定开启epoll,并且系统支持epoll,才会使用EpollEventLoopGroup类型,否则使用NioEventLoopGroup类型。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) 
    //设置服务器单向、异步发送信号量
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    //创建Netty服务端启动类,引导启动服务端
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;
    //服务器回调执行线程数量,默认设置为4
    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) 
        publicThreadNums = 4;
    
    //创建一个公共线程池,负责处理某些请求业务,例如发送异步消息回调,线程名以NettyServerPublicExecutor_为前缀
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() 
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThreadRocketMQ源码系列 NameServer 核心源码解析

RocketMQ源码系列 NameServer 核心源码解析

RocketMQ源码—Producer生产者启动源码一万字

RocketMQ源码—Producer生产者启动源码一万字

7RocketMQ 源码解析之 Broker 启动(下)

7RocketMQ 源码解析之 Broker 启动(下)