RocketMQ基础-核心架构剖析

Posted .诗书

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ基础-核心架构剖析相关的知识,希望对你有一定的参考价值。

RocketMQ基础-核心架构剖析

RocketMQ的核心架构

RocketMQ主要分为四个部分:

  1. Producer:负责生产消息;消息发布的角色,支持分布式集群方式部署,Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  2. Consumer:负责消费消息;消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  3. NameServer:NameServer是一个非常简单的Topic路由注册中心,支持Broker的动态注册和发现,支持分布式集群方式部署。
  4. BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群+主从架构部署。

核心模块剖析

NameServer

​ 路由注册中心,充当路由消息的提供者,支持Broker的动态注册与发现。NameServer是整个RocketMQ架构中非常关键的一个角色,如果只是单机部署的话,一单NameServer出现故障将会导致整个RocketMQ服务出现问题,所以NameServer通常都是需要集群部署,达到高可用的效果。

主要包括两个功能:

  1. Broker管理:接收保存Broker集群的注册信息作为路由信息的基本数据,并且提供心跳检测机制,检测已注册的Broker服务是否存活。
  2. 路由信息管理:每个NameServer节点都保存了关于Broker集群的整个路由信息以及用于客户端查询的队列信息,Producer和Consumer就可以通过NameServer集群获取整个Broker集群的路由信息,进行消息投递及消费。

值得注意的点:NameServer通常都是集群部署,但是每个NameServer节点都是完全独立的,节点之间没有信息交换每个Broker都会向每台NameServer注册路由信息,所以每个NameServer上都会有完整的Broker路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

Broker Server

​ 消息中转角色,负责存储消息、转发消息。是最重要最复杂的一个模块。在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Broker包含了以下几个重要的子模块。

  1. Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
  2. Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
  3. Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
  4. HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
  5. Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

在理解Broker之前需要了解一些RocketMQ的基本概念。

  1. Message

    消息,消息系统所传输信息的物理载体,是生产和消费数据的最小单位,每条消息必须属于一个Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

  2. Topic

    表示一类消息的集合,每个主题包含若干条消息,每条Message只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker

  3. Tag

    为消息设置的标志,用于同一Topic下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一Topic下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

  4. Message Queue

    消息队列,用于存储消息的物理地址每个Topic中的消息地址存储于多个 Message Queue 中

Broker通过集群化部署来抗下高并发访问。

Broker接收到消息后,都是存储到磁盘文件中的,通过分布式部署,将消息分撒到多台机器上存储,实现海量消息存储。

Broker通过主从架构+多副本策略,实现高可用保障,Master Broker收到消息后会将消息同步给Slave Broker,这样副本中就有一份一模一样的数据。每个Broker都得向NameServer注册,所以Master Broker和Slave broker都会向NameServer注册。Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

Producer

​ 生产者,Producer与NameServer集群中的其中一个节点(随机选择)建立长连接定期从NameServer获取Topic路由信息,并向提供Topic 服务的MasterBroker建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

涉及到的基本概念:

  1. Producer Group

    同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

Consumer

​ 消费者,Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的MasterBroker、SlaveBroker建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

涉及到的基本概念:

  1. Pull Consumer

    Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

  2. Push Consumer

    Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

  3. Consumer Group

    同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)

    给订阅同一个Topic的不同业务系统设置不同的Consumer Group,可以实现一个消息被多个业务系统所消费到

    ​ 比如:真实业务场景中,在用户下单付款后,仓储系统要安排发货,积分系统要增加积分,销售系统要给用户分配销售,这种业务场景中,仓储系统、积分系统、销售系统只需要订阅同一个Topic(order_topic),只是Consumer Group不同,订单系统在用户付款后推送付款信息到这个Topic中即可,仓储、积分、销售会同时消费到这个消息,完成自身的业务即可。

  1. Clustering/Broadcasting

    Clustering:集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

    Broadcasting:广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

    **值得注意的一个点在于,同一个消息,集群模式下,只会被同一个Consumer Group中的一个Consumer消费,而广播模式会被所有的Consumer都消费一次。**比如:10台服务器搭建的一个消费者集群,消息模式设置为 Clustering时,收到一个消息,这个消息只会被10台服务器中的一台实例消费到,而如果是Broadcasting模式,10台服务器实例都会接收到这个消息。常用的基本上都是集群模式。

总结

经过了对RocketMQ架构的剖析后,此时,RocketMQ的架构图应该更近详细:

结合上面的详细架构图,描述集群工作流程:

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

深入剖析RocketMQ源码-NameServer

一、RocketMQ架构简介

1.1 逻辑部署图

(图片来自网络)

1.2 核心组件说明

通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单说明下这四个核心组件:

NameServer:NameServer充当路由信息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。

Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:

Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:

1.3 设计理念

RocketMQ是基于主题的发布与订阅模式,核心功能包括消息发送、消息存储、消息消费,整体设计追求简单与性能第一,归纳来说主要是下面三种:

  • NameServer取代ZK充当注册中心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不一致,更加轻量级;

  • 使用内存映射机制实现高效的IO存储,达到高吞吐量;

  • 容忍设计缺陷,通过ACK确保消息至少消费一次,但是如果ACK丢失,可能消息重复消费,这种情况设计上允许,交给使用者自己保证。

本文重点介绍的就是NameServer,我们下面一起来看下NameServer是如何启动以及如何进行路由管理的。

二、NameServer架构设计

在第一章已经简单介绍了NameServer取代zk作为一种更轻量级的注册中心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?我们先看下图:

上面的图描述了NameServer进行路由注册、路由剔除和路由发现的核心原理。

路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。

路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。

路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。

高可用:NameServer通过部署多台NameServer服务器来保证自身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,但是通过发送端的容错机制保证消息发送的高可用。这个也正是NameServer追求简单高效的目的所在。

三、 启动流程

在整理了解了NameServer的架构设计之后,我们先来看下NameServer到底是如何启动的呢?

既然是源码解读,那么我们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),实际调用的是main0()方法,

代码如下:

public static NamesrvController main0(String[] args) {
​
    try {
        //创建namesrvController
        NamesrvController controller = createNamesrvController(args);
        //初始化并启动NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
​
    return null;
}

通过main方法启动NameServer,主要分为两大步,先创建NamesrvController,然后再初始化并启动NamesrvController。我们分别展开来分析。

3.1 时序图

具体展开阅读代码之前,我们先通过一个序列图对整体流程有个了解,如下图:

3.2 创建NamesrvController

先来看核心代码,如下:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // 设置版本号为当前版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
  //构造org.apache.commons.cli.Options,并添加-h -n参数,-h参数是打印帮助信息,-n参数是指定namesrvAddr
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    //初始化commandLine,并在options中添加-c -p参数,-c指定nameserver的配置文件路径,-p标识打印配置信息
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {
        System.exit(-1);
        return null;
    }
  //nameserver配置类,业务参数
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //netty服务器配置类,网络参数
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    //设置nameserver的端口号
    nettyServerConfig.setListenPort(9876);
    //命令带有-c参数,说明指定配置文件,需要根据配置文件路径读取配置文件内容,并将文件中配置信息赋值给NamesrvConfig和NettyServerConfig
    if (commandLine.hasOption(\'c\')) {
        String file = commandLine.getOptionValue(\'c\');
        if (file != null) {
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            //反射的方式
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
      //设置配置文件路径
            namesrvConfig.setConfigStorePath(file);
​
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();
        }
    }
  //命令行带有-p,说明是打印参数的命令,那么就打印出NamesrvConfig和NettyServerConfig的属性。在启动NameServer时可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性 
    if (commandLine.hasOption(\'p\')) {
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        //打印参数命令不需要启动nameserver服务,只需要打印参数即可
        System.exit(0);
    }
  //解析命令行参数,并加载到namesrvConfig中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  //检查ROCKETMQ_HOME,不能为空
    if (null == namesrvConfig.getRocketmqHome()) {
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
  //初始化logback日志工厂,rocketmq默认使用logback作为日志输出
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
​
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
​
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
  //创建NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
​
    //将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
​
    return controller;
}

通过上面对每一行代码的注释,可以看出来,创建NamesrvController的过程主要分为两步:

可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类分别是NameServer的业务参数和网络参数,我们分别看下这两个类里面有哪些属性:

NamesrvConfig

NettyServerConfig

3.3 初始化并启动

创建了NamesrvController实例之后,开始初始化并启动NameServer。

首先进行初始化,代码入口是NamesrvController#initialize。

public boolean initialize() {
  //加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中
    this.kvConfigManager.load();
  //根据nettyServerConfig初始化一个netty服务器。
    //brokerHousekeepingService是在NamesrvController实例化时构造函数里实例化的,该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  //初始化负责处理Netty网络交互数据的线程池,默认线程数是8个
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  //注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor
    this.registerProcessor();
  //注册心跳机制线程池,延迟5秒启动,每隔10秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
  //注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {
            NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
  //rocketmq可以通过开启TLS来提高数据传输的安全性,如果开启了,那么需要注册一个监听器来重新加载SslContext
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(
                new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {
                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                            log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();
                        }
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                            certChanged = true;
                        }
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                            keyChanged = true;
                        }
                        if (certChanged && keyChanged) {
                            log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();
                        }
                    }
                    private void reloadServerSslContext() {
                        ((NettyRemotingServer) remotingServer).loadSslContext();
                    }
                });
        } catch (Exception e) {
            log.warn("FileWatchService created error, can\'t load the certificate dynamically");
        }
    }
​
    return true;
}

上面的代码是NameServer初始化流程,通过每行代码的注释,可以看出来,主要有5步骤操作:

  • Step1:加载KV配置,并写入到KVConfigManager的configTable属性中;

  • Step2:初始化netty服务器;

  • Step3:初始化处理netty网络交互数据的线程池;

  • Step4:注册心跳机制线程池,启动5秒后每隔10秒检测一次Broker的存活情况;

  • Step5:注册打印KV配置的线程池,启动1分钟后,每隔10分钟打印一次KV配置。

RocketMQ的开发团队还使用了一个常用的编程技巧,就是使用JVM钩子函数对NameServer进行优雅停机。这样在JVM进程关闭前,会先执行shutdown操作。

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
        controller.shutdown();
        return null;
    }
}));

执行start函数,启动NameServer。代码比较简单,就是将第一步中创建的netty server进行启动。其中remotingServer.start()方法不展开详细说明了,需要对netty比较熟悉,不是本篇文章重点,有兴趣的同学可以自行下载源码阅读。

public void start() throws Exception {
    //启动netty服务
    this.remotingServer.start();
  //如果开启了TLS
    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }
}

四、路由管理

我们在第二章开篇有了解到NameServer作为一个轻量级的注册中心,主要是为消息生产者和消费者提供Topic的路由信息,并对这些路由信息和Broker节点进行管理,主要包括路由注册、路由剔除和路由发现。

本章将会通过源码的角度来具体分析NameServer是如果进行路由信息管理的。核心代码主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中实现。

4.1 路由元信息

在了解路由信息管理之前,我们首先需要了解NameServer到底存储了哪些路由元信息,数据结构分别是什么样的。

查看代码我们可以看到主要通过5个属性来维护路由元信息,如下:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

我们依次对这5个属性进行展开说明。

4.1.1 TopicQueueTable

说明:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡。

数据结构:HashMap结构,key是Topic名字,value是一个类型是QueueData的队列集合。在第一章就讲过,一个Topic中有多个队列。QueueData的数据结构如下:

数据结构示例:

topicQueueTable:{
    "topic1": [
        {
            "brokerName": "broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        },
        {
            "brokerName": "broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        }
    ]
}

4.1.2 BrokerAddrTable

说明:Broker基础信息,包含BrokerName、所属集群名称、主备Broker地址。

数据结构:HashMap结构,key是BrokerName,value是一个类型是BrokerData的对象。BrokerData的数据结构如下(可以结合下面Broker主从结构逻辑图来理解):

Broker主从结构逻辑图:

数据结构示例:

brokerAddrTable:{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": {
            0: "192.168.1.1:10000",
            1: "192.168.1.2:10000"
        }
    },
    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": {
            0: "192.168.1.3:10000",
            1: "192.168.1.4:10000"
        }
    }
}

4.1.3 ClusterAddrTable

说明:Broker集群信息,存储集群中所有Broker名称。

数据结构:HashMap结构,key是ClusterName,value是存储BrokerName的Set结构。

数据结构示例:

clusterAddrTable:{
    "c1": ["broker-a","broker-b"]
}

4.1.4 BrokerLiveTable

说明:Broker状态信息。NameServer每次收到心跳包时会替换该信息

数据结构:HashMap结构,key是Broker的地址,value是BrokerLiveInfo结构的该Broker信息对象。BrokerLiveInfo的数据结构如下:

数据结构示例:

brokerLiveTable:{
    "192.168.1.1:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
    },
    "192.168.1.2:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.1:10000"
     },
    "192.168.1.3:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""
     },
    "192.168.1.4:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.3:10000"
     }
}

4.1.5 filterServerTable

说明:Broker上的FilterServer列表,消息过滤服务器列表,后续介绍Consumer时会介绍,consumer拉取数据是通过filterServer拉取,consumer向Broker注册。

数据结构:HashMap结构,key是Broker地址,value是记录了filterServer地址的List集合。

4.2 路由注册

路由注册是通过Broker和NameServer之间的心跳功能来实现的。主要分为两步:

我们分别展开分析这两步。

4.2.1 Broker发送心跳包

发送心跳包的核心逻辑是在Broker启动逻辑里,代码入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的核心代码,如下:

1)创建了一个线程池注册Broker,程序启动10秒后执行,每隔30秒(默认30s,时间间隔在10秒到60秒之间,BrokerConfig.getRegisterNameServerPeriod()的默认值是30秒)执行一次。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

2)封装Topic配置和版本号之后,进行实际的路由注册(注:封装Topic配置不是本篇重点,会在介绍Broker源码时重点讲解)。实际路由注册是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中实现,核心代码如下:

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
​
    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    //获取nameserver地址列表
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    /**
      *封装请求包头start
      *封装请求包头,主要封装broker相关信息
    **/
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
    //封装requestBody,包括topic和filterServerList相关信息
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        /**
      *封装请求包头end
    **/
        //开启多线程到每个nameserver进行注册
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //实际进行注册方法
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            //封装nameserver返回的信息
                            registerBrokerResultList.add(result);
                        }
​
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
​
        try {
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }
    }
​
    return registerBrokerResultList;
}

从上面代码来看,也比较简单,首先需要封装请求包头和requestBody,然后开启多线程到每个NameServer服务器去注册。

请求包头类型为RegisterBrokerRequestHeader,主要包括如下字段:

requestBody类型是RegisterBrokerBody,主要包括如下字段:

1)实际的路由注册是通过registerBroker方法实现,核心代码如下:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
    //创建请求指令,需要注意RequestCode.REGISTER_BROKER,nameserver端的网络处理器会根据requestCode进行相应的业务处理
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
  //基于netty进行网络传输
    if (oneway) {
        //如果是单向调用,没有返回值,不返回nameserver返回结果
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
            // Ignore
        }
        return null;
    }
  //异步调用向nameserver发起注册,获取nameserver的返回信息
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            //获取返回的reponseHeader
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            //重新封装返回结果,更新masterAddr和haServerAddr
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }
​
    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

borker和NameServer之间通过netty进行网络传输,Broker向NameServer发起注册时会在请求中添加注册码RequestCode.REGISTER_BROKER。这是一种网络跟踪方法,RocketMQ的每个请求都会定义一个requestCode,服务端的网络处理器会根据不同的requestCode进行影响的业务处理。

4.2.2 NameServer处理心跳包

Broker发出路由注册的心跳包之后,NameServer会根据心跳包中的requestCode进行处理。NameServer的默认网络处理器是DefaultRequestProcessor,具体代码如下:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
    if (ctx != null) {
        log.debug("receive request, {} {} {}",
                  request.getCode(),
                  RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                  request);
    }
    switch (request.getCode()) {
        ......
        //,如果是RequestCode.REGISTER_BROKER,进行broker注册
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        ......
        default:
            break;
    }
    return null;
}

判断requestCode,如果是RequestCode.REGISTER_BROKER,那么确定业务处理逻辑是注册Broker。根据Broker版本号选择不同的方法,我们已V3_0_11以上为例,调用registerBrokerWithFilterServer方法进行注册主要步骤分为三步:

核心注册逻辑是由RouteInfoManager#registerBroker来实现,核心代码如下:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            //加写锁,防止并发写RoutInfoManager中的路由表信息。
            this.lock.writeLock().lockInterruptibly();
      //根据clusterName从clusterAddrTable中获取所有broker名字集合
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            //如果没有获取到,说明broker所属集群还没记录,那么需要创建,并将brokerName加入到集群的broker集合中
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;
      //根据brokerName尝试从brokerAddrTable中获取brokerData
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                //如果没获取到brokerData,新建BrokerData并放入brokerAddrTable,registerFirst设为true;
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            //更新brokerData中的brokerAddrs
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //考虑到可能出现master挂了,slave变成master的情况,这时候brokerId会变成0,这时候需要把老的brokerAddr给删除
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
                    it.remove();
                }
            }
      //更新brokerAddrs,根据返回的oldAddr判断是否是第一次注册的broker
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
​
            //如过Broker是Master,并且Broker的Topic配置信息发生变化或者是首次注册,需要创建或更新Topic路由元数据,填充topicQueueTable
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            //创建或更新Topic路由元数据
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
      //更新BrokerLivelnfo,BrokeLivelnfo是执行路由删除的重要依据
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                                                                         new BrokerLiveInfo(
                                                                             System.currentTimeMillis(),
                                                                             topicConfigWrapper.getDataVersion(),
                                                                             channel,
                                                                             haServerAddr));
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
      //注册Broker的filterServer地址列表
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
      //如果此Broker为从节点,则需要查找Broker Master的节点信息,并更新对应masterAddr属性
            if (MixAll.MASTER_ID != brokerId) {
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }
​
    return result;
}

通过上面的源码分析,可以分解出一个Broker的注册主要分7步:

  • Step1:加写锁,防止并发写RoutInfoManager中的路由表信息;

  • Step2:判断Broker所属集群是否存在,不存在需要创建,并将Broker名加入到集群Broker集合中;

  • Step3:维护BrokerData;

  • Step4:如过Broker是Master,并且Broker的Topic配置信息发生变化或者是首次注册,需要创建或更新Topic路由元数据,填充TopicQueueTable;

  • Step5:更新BrokerLivelnfo;

  • Step6:注册Broker的filterServer地址列表;

  • Step7:如果此Broker为从节点,则需要查找Broker Master的节点信息,并更新对应masterAddr属性,并返回给Broker端。

4.3 路由剔除

4.3.1 触发条件

路由剔除的触发条件主要有两个:

4.3.2 源码解析

上面描述的触发点最终删除路由的逻辑是一样的,统一在RouteInfoManager#onChannelDestroy

中实现,核心代码如下:

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                //加读锁
                this.lock.readLock().lockInterruptibly();
                //通过channel从brokerLiveTable中找出对应的Broker地址
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {
                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {
                        brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                //释放读锁
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
  //若该Broker已经从存活的Broker地址列表中被清除,则直接使用remoteAddr
    if (null == brokerAddrFound) {
        brokerAddrFound = remoteAddr;
    } else {
        log.info("the broker\'s channel destroyed, {}, clean it\'s data structure at once", brokerAddrFound);
    }
​
    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
​
        try {
            try {
                //申请写锁
                this.lock.writeLock().lockInterruptibly();
                //根据brokerAddress,将这个brokerAddress从brokerLiveTable和filterServerTable中移除
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                //遍历brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
                    BrokerData brokerData = itBrokerAddrTable.next().getValue();
​
                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        //根据brokerAddress找到对应的brokerData,并将brokerData中对应的brokerAddress移除
                        if (brokerAddr.equals(brokerAddrFound)) {
                            brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                     brokerId, brokerAddr);
                            break;
                        }
                    }
          //如果移除后,整个brokerData的brokerAddress空了,那么将整个brokerData移除
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                 brokerData.getBrokerName());
                    }
                }
​
                if (brokerNameFound != null && removeBrokerName) {
                    //遍历clusterAddrTable
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {
                        Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        //根据第三步中获取的需要移除的brokerName,将对应的brokerName移除了
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {
                            log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                     brokerNameFound, clusterName);
              //如果移除后,该集合为空,那么将整个集群从clusterAddrTable中移除
                            if (brokerNames.isEmpty()) {
                                log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                         clusterName);
                                it.remove();
                            }
​
                            break;
                        }
                    }
                }
​
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    //遍历topicQueueTable
                    while (itTopicQueueTable.hasNext()) {
                        Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
​
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {
                            QueueData queueData = itQueueData.next();
                            //根据brokerName,将topic下对应的broker移除掉
                            if (queueData.getBrokerName().equals(brokerNameFound)) {
                                itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                         topic, queueData);
                            }
                        }
            //如果该topic下只有一个待移除的broker,那么该topic也从table中移除
                        if (queueDataList.isEmpty()) {
                            itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                     topic);
                        }
                    }
                }
            } finally {
                //释放写锁
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("onChannelDestroy Exception", e);
        }
    }
}

路由删除整体逻辑主要分为6步:

  • Step1:加readlock,通过channel从BrokerLiveTable中找出对应的Broker地址,释放readlock,若该Broker已经从存活的Broker地址列表中被清除,则直接使用remoteAddr。

  • Step2:申请写锁,根据BrokerAddress从BrokerLiveTable、filterServerTable移除。

  • Step3:遍历BrokerAddrTable,根据BrokerAddress找到对应的brokerData,并将brokerData中对应的brokerAddress移除,如果移除后,整个brokerData的brokerAddress空了,那么将整个brokerData移除。

  • Step4:遍历clusterAddrTable,根据第三步中获取的需要移除的BrokerName,将对应的brokerName移除了。如果移除后,该集合为空,那么将整个集群从clusterAddrTable中移除。

  • Step5:遍历TopicQueueTable,根据BrokerName,将Topic下对应的Broker移除掉,如果该Topic下只有一个待移除的Broker,那么该Topic也从table中移除。

  • Step6:释放写锁。

从上面可以看出,路由剔除的整体逻辑比较简单,就是单纯地针对路由元信息的数据结构进行操作。为了大家能够更好地理解这块代码,建议大家对照4.1中介绍的路由元信息的数据结构来进行代码走读。

4.4 路由发现

当路由信息发生变化之后,NameServer不会主动推送给客户端,而是等待客户端定期到nameserver主动拉取最新路由信息。这种设计方式降低了NameServer实现的复杂性。

4.4.1 producer主动拉取

producer在启动后会开启一系列定时任务,其中有一个任务就是定期从NameServer获取Topic路由信息。代码入口是MQClientInstance#start-ScheduledTask(),核心代码如下:

private void startScheduledTask() {
    ......
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {
            try {
                //从nameserver更新最新的topic路由信息
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
​
    ......
}
​
/**
    * 从nameserver获取topic路由信息
    */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    ......
    //向nameserver发送请求包,requestCode为RequestCode.GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
  ......
}

producer和NameServer之间通过netty进行网络传输,producer向NameServer发起的请求中添加注册码

RequestCode.GET_ROUTEINFO_BY_TOPIC。

4.4.2 NameServer返回路由信息

NameServer收到producer发送的请求后,会根据请求中的requestCode进行处理。处理requestCode同样是在默认的网络处理器DefaultRequestProcessor中进行处理,最终通过RouteInfoManager#pickupTopicRouteData来实现。

TopicRouteData结构

在正式解析源码前,我们先看下NameServer返回给producer的数据结构。通过代码可以看到,返回的是一个TopicRouteData对象,具体结构如下:

其中QueueData,BrokerData,filterServerTable在4.1章节介绍路由元信息时有介绍。

源码分析

在了解了返回给producer的TopicRouteData结构后,我们进入RouteInfoManager#pickupTopicRouteData方法来看下具体如何实现。

public TopicRouteData pickupTopicRouteData(final String topic) {
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);
​
    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);
​
    try {
        try {
            //加读锁
            this.lock.readLock().lockInterruptibly();
            //从元数据topicQueueTable中根据topic名字获取队列集合
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                //将获取到的队列集合写入topicRouteData的queueDatas中
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
​
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {
                    QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }
        //遍历从QueueData集合中提取的brokerName
                for (String brokerName : brokerNameSet) {
                    //根据brokerName从brokerAddrTable获取brokerData
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        //克隆brokerData对象,并写入到topicRouteData的brokerDatas中
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        //遍历brokerAddrs
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            //根据brokerAddr获取filterServerList,封装后写入到topicRouteData的filterServerTable中
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            //释放读锁
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {
        log.error("pickupTopicRouteData Exception", e);
    }
​
    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
​
    if (foundBrokerData && foundQueueData) {
        return topicRouteData;
    }
​
    return null;
}

上面代码封装了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,还有orderTopicConf字段没封装,我们再看下这个字段是在什么时候封装的,我们向上看RouteInfoManager#pickupTopicRouteData的调用方法DefaultRequestProcessor#getRouteInfoByTopic如下:

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
                                           RemotingCommand request) throws RemotingCommandException {
    ......
  //这块代码就是上面解析的代码,获取到topicRouteData对象
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
​
    if (topicRouteData != null) {
        //判断nameserver的orderMessageEnable配置是否打开
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            //如果配置打开了,根据namespace和topic名字获取kvConfig配置文件中顺序消息配置内容
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                                                        requestHeader.getTopic());
            //封装orderTopicConf
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
​
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
  //如果没有获取到topic路由,那么reponseCode为TOPIC_NOT_EXIST
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                       + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

结合这两个方法,我们可以总结出查找Topic路由主要分为3个步骤:

五、小结

本篇文章主要是从源码的角度给大家介绍了RocketMQ的NameServer,包括NameServer的启动流程、路由注册、路由剔除和路由发现。我们在了解了NameServer的设计原理之后,也可以回过头思考下在设计过程中一些值得我们学习的小技巧,在此我抛砖引玉提出两点:

  • 启动流程注册JVM钩子用于优雅停机。这是一个编程技巧,我们在实际开发过程中,如果有使用线程池或者一些常驻线程任务时,可以考虑通过注册JVM钩子的方式,在JVM关闭前释放资源或者完成一些事情来保证优雅停机。

  • 更新路由表时需要通过加锁防止并发操作,这里使用的是锁粒度较少的读写锁,允许多个消息发送者并发读,保证消息发送时的高并发,但同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行,这也是读写锁经典使用场景。

六、参考资料

1、《RocketMQ技术内幕》

2、《RocketMQ核心原理和实践》

3、Apache RocketMQ开发者指南

以上是关于RocketMQ基础-核心架构剖析的主要内容,如果未能解决你的问题,请参考以下文章

深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

RocketMQ NameServer深入剖析

RocketMQ基础概念剖析,并分析一下Producer的底层源码

RocketMQ的消息架构模型以及核心概念

深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)