RocketMQ源码—Broker与NameServer的心跳服务源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Broker与NameServer的心跳服务源码相关的知识,希望对你有一定的参考价值。

详细介绍了RocketMQ的Broker与NameServer的心跳服务源码,主要包括三部分:

  1. Broker发送心跳注册请求源码;
  2. NameServer处理心跳注册请求源码;
  3. NameServer的心跳检测服务源码;

文章目录

1 Broker发送心跳注册请求

Broker启动过程中,会跟所有的NameServer建立并保持长连接,然后开启定时任务定时发送心跳包,心跳包中包含当前Broker信息,包括地址、名字、id等等,以及存储的所有Topic的信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

1.1 发送心跳包入口

具体的入口就是BrokerController#start方法:

/**
 * BrokerController的方法
 * 启动BrokerController
 */
public void start() throws Exception 
    //启动消息存储服务
    if (this.messageStore != null) 
        this.messageStore.start();
    
    //启动netty远程服务
    if (this.remotingServer != null) 
        this.remotingServer.start();
    
    //启动快速netty远程服务
    if (this.fastRemotingServer != null) 
        this.fastRemotingServer.start();
    
    //文件监听器启动
    if (this.fileWatchService != null) 
        this.fileWatchService.start();
    
    //broker对外api启动
    if (this.brokerOuterAPI != null) 
        this.brokerOuterAPI.start();
    
    //长轮询拉取消息挂起服务启动
    if (this.pullRequestHoldService != null) 
        this.pullRequestHoldService.start();
    
    //客户端连接心跳服务启动
    if (this.clientHousekeepingService != null) 
        this.clientHousekeepingService.start();
    
    //过滤服务管理器启动
    if (this.filterServerManager != null) 
        this.filterServerManager.start();
    
    //如果没有开启DLeger的相关设置,默认没有启动
    if (!messageStoreConfig.isEnableDLegerCommitLog()) 
        //如果不是SLAVE,那么启动transactionalMessageCheckService事务消息检查服务
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        //如果是SLAVE,那么启动定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法
        //同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        /*
         * 强制注册当前broker信息到所有的nameserver
         */
        this.registerBrokerAll(true, false, true);
    
    //启动定时任务,默认情况下每隔30s向nameServer进行一次注册,
    //时间间隔可以配置registerNameServerPeriod属性,允许的值是在1万到6万毫秒之间。
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 

        @Override
        public void run() 
            try 
                //定时发送心跳包并上报数据
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
             catch (Throwable e) 
                log.error("registerBrokerAll Exception", e);
            
        
    , 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    //broker相关统计服务启动
    if (this.brokerStatsManager != null) 
        this.brokerStatsManager.start();
    
    //broker快速失败服务启动
    if (this.brokerFastFailure != null) 
        this.brokerFastFailure.start();
    

在start方法中,可以看到在最后启动了一个定时任务,默认情况下每隔30s调用registerBrokerAll方法向所有的nameServer进行一次注册broker信息,时间间隔可以配置registerNameServerPeriod属性,允许的值是在1万到6万毫秒之间。这个定时任务就是Broker向nameserver发送的心跳包的定时任务,包括topic名、读、写队列个数、队列权限、是否有序等信息。

在这个定时任务之前,实际上还会调用一次registerBrokerAll方法,在broker首次启动时强制进行Broker注册。

1.2 registerBrokerAll注册broker信息

registerBrokerAll方法用于当前Broker将自身信息注册到所有的NameServer中。

内部调用的doRegisterBrokerAll方法执行注册,调用该方法之前,会判断是否需要注册,如果如果forceRegister为true,表示强制注册,或者如果当前broker应该注册,那么向nameServer进行注册。

在start方法中调用的registerBrokerAll方法,其forceRegister参数都为true,表示一定会强制注册的。

/**
 * BrokerController的方法
 * 注册Broker信息到NameServer,发送心跳包
 *
 * @param checkOrderConfig 是否检测顺序topic
 * @param oneway           是否是单向
 * @param forceRegister    是否强制注册
 */
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) 
    //根据TopicConfigManager中的topic信息构建topic信息的传输协议对象,
    //在此前的topicConfigManager.load()方法中已经加载了所有topic信息,topic配置文件加载路径为user.home/store/config/topics.json
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    //如果当前broker权限不支持写或者读
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) 
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) 
            //那么重新配置topic权限
            TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    
    /*
     * 如果forceRegister为true,表示强制注册,或者如果当前broker应该注册,那么向nameServer进行注册
     */
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) 
        /*
         * 执行注册
         */
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    

1.2.1 needRegister是否需要注册

该方法用于判断当前broker是否需要向nameserver进行注册,当forceRegister参数为true的时候,表示强制注册,那么该方法的结果是无所谓的,如果forceRegister为false,那么borker是否需要向nameserver注册就得看这个方法的结果了。

其内部调用brokerOuterAPI#needRegister方法:

/**
 * BrokerController的方法
 * <p>
 * broker是否需要向nemeserver中注册
 *
 * @param clusterName  集群名
 * @param brokerAddr   broker地址
 * @param brokerName   broker名字
 * @param brokerId     brkerid
 * @param timeoutMills 超时时间
 * @return broker是否需要向nemeserver中注册
 */
private boolean needRegister(final String clusterName,
                             final String brokerAddr,
                             final String brokerName,
                             final long brokerId,
                             final int timeoutMills) 
    //根据TopicConfigManager中的topic信息构建topic信息的传输协议对象,
    //在此前的topicConfigManager.load()方法中已经加载了所有topic信息,topic配置文件加载路径为user.home/store/config/topics.json
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    /*
     * 获取所有nameServer的DataVersion数据,一一对比自身数据是否一致,如果有一个nameserver的DataVersion数据版本不一致则重新注册
     */
    List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
    boolean needRegister = false;
    //如果和一个nameServer的数据版本不一致,则需要重新注册
    for (Boolean changed : changeList) 
        if (changed) 
            needRegister = true;
            break;
        
    
    return needRegister;

needRegister方法的逻辑也很简单,就是向所有nameServer发起请求(请求code为QUERY_DATA_VERSION,322),获取所有nameserver的DataVersion数据,然后一一对比自身的DataVersion数据是否一致,如果有一个nameserver的数据版本不一致则重新注册。

/**
 * BrokerOuterAPI的方法
 */
public List<Boolean> needRegister(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final int timeoutMills) 
    //创建一个CopyOnWriteArrayList类型的集合,用来保存请求的返回结果
    final List<Boolean> changedList = new CopyOnWriteArrayList<>();
    //获取全部nameServer地址
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) 
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) 
            brokerOuterExecutor.execute(new Runnable() 
                @Override
                public void run() 
                    try 
                        /*
                         * 构造请求头,将一些broker信息放入请求头
                         */
                        QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
                        requestHeader.setBrokerAddr(brokerAddr);
                        requestHeader.setBrokerId(brokerId);
                        requestHeader.setBrokerName(brokerName);
                        requestHeader.setClusterName(clusterName);
                        //构建远程调用请求对象,code为QUERY_DATA_VERSION,322
                        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                        request.setBody(topicConfigWrapper.getDataVersion().encode());
                        RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
                        DataVersion nameServerDataVersion = null;
                        Boolean changed = false;
                        switch (response.getCode()) 
                            case ResponseCode.SUCCESS: 
                                //
                                QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                changed = queryDataVersionResponseHeader.getChanged();
                                byte[] body = response.getBody();
                                if (body != null) 
                                    //获取nameserver的dataversion
                                    nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
                                    //如果当前broker的dataversion与nameserver的dataversion不相等,则表示需要继续宁更新
                                    if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) 
                                        changed = true;
                                    
                                
                                if (changed == null || changed) 
                                    changedList.add(Boolean.TRUE);
                                
                            
                            default:
                                break;
                        
                        log.warn("Query data version from name server  OK,changed , broker ,name server ", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
                     catch (Exception e) 
                        changedList.add(Boolean.TRUE);
                        log.error("Query data version from name server   Exception, ", namesrvAddr, e);
                     finally 
                        countDownLatch.countDown();
                    
                
            );

        
        try 
            countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
         catch (InterruptedException e) 
            log.error("query dataversion from nameserver countDownLatch await Exception", e);
        
    
    return changedList;

1.2.1.1 DataVersion介绍

DataVersion是RocketMQ的数据版本控制机制。其结构比较简单,核心属性方法如下:

/**
 * 时间戳毫秒值
 */
private long timestamp = System.currentTimeMillis();
/**
 * 版本号
 */
private AtomicLong counter = new AtomicLong(0);

/**
 * 拷贝目标dataVersion的数据,在从文件恢复数据的时候会用到
 */
public void assignNewOne(final DataVersion dataVersion) 
    this.timestamp = dataVersion.timestamp;
    this.counter.set(dataVersion.counter.get());

/**
 * 更新时间戳以及counter到下一个版本
 */
public void nextVersion() 
    this.timestamp = System.currentTimeMillis();
    this.counter.incrementAndGet();

他的nextVersion方法被调用时,将会引起timestamp和counter的改变,一般来说,当新创建broker,或者更新topic的信息的时候nextVersion方法会被调用。

Dataversion和topic的配置都被持久化到topics.json文件中,其格式如下:


   "dataVersion":
      "counter":3,
      "timestamp":1651398321850
   ,
   "topicConfigTable":
      "SCHEDULE_TOPIC_XXXX":
         "order":false,
         "perm":6,
         "readQueueNums":18,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"SCHEDULE_TOPIC_XXXX",
         "topicSysFlag":0,
         "writeQueueNums":18
      ,
      "TopicTest":
         "order":false,
         "perm":6,
         "readQueueNums":4,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"TopicTest",
         "topicSysFlag":0,
         "writeQueueNums":4
      ,
      "SELF_TEST_TOPIC":
         "order":false,
         "perm":6,
         "readQueueNums":1,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"SELF_TEST_TOPIC",
         "topicSysFlag":0,
         "writeQueueNums":1
      ,
      "DefaultCluster":
         "order":false,
         "perm":7,
         "readQueueNums":16,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"DefaultCluster",
         "topicSysFlag":0,
         "writeQueueNums":16
      ,
      "DefaultCluster_REPLY_TOPIC":
         "order":false,
         "perm":6,
         "readQueueNums":1,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"DefaultCluster_REPLY_TOPIC",
         "topicSysFlag":0,
         "writeQueueNums":1
      ,
      "RMQ_SYS_TRANS_HALF_TOPIC":
         "order":false,
         "perm":6,
         "readQueueNums":1,
         "topicFilterType":"SINGLE_TAG",
         "topicName":"RMQ_SYS_TRANS_HALF_TOPIC",
         "topicSysFlag":0,
         "writeQueueNums":1
      ,
      "broker-a":
         "order":false,
         "perm":7,
         "readQueueNums":1,
    

以上是关于RocketMQ源码—Broker与NameServer的心跳服务源码的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码解析-Broker与Namesrv以及Consumer交互

RocketMQ源码解析-Broker与Namesrv以及Consumer交互

基于源码搭建运行 RocketMQ 主从架构

7RocketMQ 源码解析之 Broker 启动(下)

7RocketMQ 源码解析之 Broker 启动(下)

rocketMQ1搭建MQ服务器,生产一个订单与消费一个订单