RocketMQ源码—Broker与NameServer的心跳服务源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Broker与NameServer的心跳服务源码相关的知识,希望对你有一定的参考价值。
详细介绍了RocketMQ的Broker与NameServer的心跳服务源码,主要包括三部分:
- Broker发送心跳注册请求源码;
- NameServer处理心跳注册请求源码;
- NameServer的心跳检测服务源码;
文章目录
1 Broker发送心跳注册请求
Broker启动过程中,会跟所有的NameServer建立并保持长连接,然后开启定时任务定时发送心跳包,心跳包中包含当前Broker信息,包括地址、名字、id等等,以及存储的所有Topic的信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
1.1 发送心跳包入口
具体的入口就是BrokerController#start方法:
/**
* BrokerController的方法
* 启动BrokerController
*/
public void start() throws Exception
//启动消息存储服务
if (this.messageStore != null)
this.messageStore.start();
//启动netty远程服务
if (this.remotingServer != null)
this.remotingServer.start();
//启动快速netty远程服务
if (this.fastRemotingServer != null)
this.fastRemotingServer.start();
//文件监听器启动
if (this.fileWatchService != null)
this.fileWatchService.start();
//broker对外api启动
if (this.brokerOuterAPI != null)
this.brokerOuterAPI.start();
//长轮询拉取消息挂起服务启动
if (this.pullRequestHoldService != null)
this.pullRequestHoldService.start();
//客户端连接心跳服务启动
if (this.clientHousekeepingService != null)
this.clientHousekeepingService.start();
//过滤服务管理器启动
if (this.filterServerManager != null)
this.filterServerManager.start();
//如果没有开启DLeger的相关设置,默认没有启动
if (!messageStoreConfig.isEnableDLegerCommitLog())
//如果不是SLAVE,那么启动transactionalMessageCheckService事务消息检查服务
startProcessorByHa(messageStoreConfig.getBrokerRole());
//如果是SLAVE,那么启动定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法
//同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
/*
* 强制注册当前broker信息到所有的nameserver
*/
this.registerBrokerAll(true, false, true);
//启动定时任务,默认情况下每隔30s向nameServer进行一次注册,
//时间间隔可以配置registerNameServerPeriod属性,允许的值是在1万到6万毫秒之间。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
//定时发送心跳包并上报数据
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
catch (Throwable e)
log.error("registerBrokerAll Exception", e);
, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
//broker相关统计服务启动
if (this.brokerStatsManager != null)
this.brokerStatsManager.start();
//broker快速失败服务启动
if (this.brokerFastFailure != null)
this.brokerFastFailure.start();
在start方法中,可以看到在最后启动了一个定时任务,默认情况下每隔30s调用registerBrokerAll方法向所有的nameServer进行一次注册broker信息,时间间隔可以配置registerNameServerPeriod属性,允许的值是在1万到6万毫秒之间。这个定时任务就是Broker向nameserver发送的心跳包的定时任务,包括topic名、读、写队列个数、队列权限、是否有序等信息。
在这个定时任务之前,实际上还会调用一次registerBrokerAll方法,在broker首次启动时强制进行Broker注册。
1.2 registerBrokerAll注册broker信息
registerBrokerAll方法用于当前Broker将自身信息注册到所有的NameServer中。
内部调用的doRegisterBrokerAll方法执行注册,调用该方法之前,会判断是否需要注册,如果如果forceRegister为true,表示强制注册,或者如果当前broker应该注册,那么向nameServer进行注册。
在start方法中调用的registerBrokerAll方法,其forceRegister参数都为true,表示一定会强制注册的。
/**
* BrokerController的方法
* 注册Broker信息到NameServer,发送心跳包
*
* @param checkOrderConfig 是否检测顺序topic
* @param oneway 是否是单向
* @param forceRegister 是否强制注册
*/
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister)
//根据TopicConfigManager中的topic信息构建topic信息的传输协议对象,
//在此前的topicConfigManager.load()方法中已经加载了所有topic信息,topic配置文件加载路径为user.home/store/config/topics.json
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//如果当前broker权限不支持写或者读
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission()))
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values())
//那么重新配置topic权限
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
/*
* 如果forceRegister为true,表示强制注册,或者如果当前broker应该注册,那么向nameServer进行注册
*/
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills()))
/*
* 执行注册
*/
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
1.2.1 needRegister是否需要注册
该方法用于判断当前broker是否需要向nameserver进行注册,当forceRegister参数为true的时候,表示强制注册,那么该方法的结果是无所谓的,如果forceRegister为false,那么borker是否需要向nameserver注册就得看这个方法的结果了。
其内部调用brokerOuterAPI#needRegister方法:
/**
* BrokerController的方法
* <p>
* broker是否需要向nemeserver中注册
*
* @param clusterName 集群名
* @param brokerAddr broker地址
* @param brokerName broker名字
* @param brokerId brkerid
* @param timeoutMills 超时时间
* @return broker是否需要向nemeserver中注册
*/
private boolean needRegister(final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final int timeoutMills)
//根据TopicConfigManager中的topic信息构建topic信息的传输协议对象,
//在此前的topicConfigManager.load()方法中已经加载了所有topic信息,topic配置文件加载路径为user.home/store/config/topics.json
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
/*
* 获取所有nameServer的DataVersion数据,一一对比自身数据是否一致,如果有一个nameserver的DataVersion数据版本不一致则重新注册
*/
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
boolean needRegister = false;
//如果和一个nameServer的数据版本不一致,则需要重新注册
for (Boolean changed : changeList)
if (changed)
needRegister = true;
break;
return needRegister;
needRegister方法的逻辑也很简单,就是向所有nameServer发起请求(请求code为QUERY_DATA_VERSION,322),获取所有nameserver的DataVersion数据,然后一一对比自身的DataVersion数据是否一致,如果有一个nameserver的数据版本不一致则重新注册。
/**
* BrokerOuterAPI的方法
*/
public List<Boolean> needRegister(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final TopicConfigSerializeWrapper topicConfigWrapper,
final int timeoutMills)
//创建一个CopyOnWriteArrayList类型的集合,用来保存请求的返回结果
final List<Boolean> changedList = new CopyOnWriteArrayList<>();
//获取全部nameServer地址
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList)
brokerOuterExecutor.execute(new Runnable()
@Override
public void run()
try
/*
* 构造请求头,将一些broker信息放入请求头
*/
QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
//构建远程调用请求对象,code为QUERY_DATA_VERSION,322
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
request.setBody(topicConfigWrapper.getDataVersion().encode());
RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
DataVersion nameServerDataVersion = null;
Boolean changed = false;
switch (response.getCode())
case ResponseCode.SUCCESS:
//
QueryDataVersionResponseHeader queryDataVersionResponseHeader =
(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
changed = queryDataVersionResponseHeader.getChanged();
byte[] body = response.getBody();
if (body != null)
//获取nameserver的dataversion
nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
//如果当前broker的dataversion与nameserver的dataversion不相等,则表示需要继续宁更新
if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion))
changed = true;
if (changed == null || changed)
changedList.add(Boolean.TRUE);
default:
break;
log.warn("Query data version from name server OK,changed , broker ,name server ", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
catch (Exception e)
changedList.add(Boolean.TRUE);
log.error("Query data version from name server Exception, ", namesrvAddr, e);
finally
countDownLatch.countDown();
);
try
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
catch (InterruptedException e)
log.error("query dataversion from nameserver countDownLatch await Exception", e);
return changedList;
1.2.1.1 DataVersion介绍
DataVersion是RocketMQ的数据版本控制机制。其结构比较简单,核心属性方法如下:
/**
* 时间戳毫秒值
*/
private long timestamp = System.currentTimeMillis();
/**
* 版本号
*/
private AtomicLong counter = new AtomicLong(0);
/**
* 拷贝目标dataVersion的数据,在从文件恢复数据的时候会用到
*/
public void assignNewOne(final DataVersion dataVersion)
this.timestamp = dataVersion.timestamp;
this.counter.set(dataVersion.counter.get());
/**
* 更新时间戳以及counter到下一个版本
*/
public void nextVersion()
this.timestamp = System.currentTimeMillis();
this.counter.incrementAndGet();
他的nextVersion方法被调用时,将会引起timestamp和counter的改变,一般来说,当新创建broker,或者更新topic的信息的时候nextVersion方法会被调用。
Dataversion和topic的配置都被持久化到topics.json文件中,其格式如下:
"dataVersion":
"counter":3,
"timestamp":1651398321850
,
"topicConfigTable":
"SCHEDULE_TOPIC_XXXX":
"order":false,
"perm":6,
"readQueueNums":18,
"topicFilterType":"SINGLE_TAG",
"topicName":"SCHEDULE_TOPIC_XXXX",
"topicSysFlag":0,
"writeQueueNums":18
,
"TopicTest":
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
,
"SELF_TEST_TOPIC":
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"SELF_TEST_TOPIC",
"topicSysFlag":0,
"writeQueueNums":1
,
"DefaultCluster":
"order":false,
"perm":7,
"readQueueNums":16,
"topicFilterType":"SINGLE_TAG",
"topicName":"DefaultCluster",
"topicSysFlag":0,
"writeQueueNums":16
,
"DefaultCluster_REPLY_TOPIC":
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"DefaultCluster_REPLY_TOPIC",
"topicSysFlag":0,
"writeQueueNums":1
,
"RMQ_SYS_TRANS_HALF_TOPIC":
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"RMQ_SYS_TRANS_HALF_TOPIC",
"topicSysFlag":0,
"writeQueueNums":1
,
"broker-a":
"order":false,
"perm":7,
"readQueueNums":1,
以上是关于RocketMQ源码—Broker与NameServer的心跳服务源码的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码解析-Broker与Namesrv以及Consumer交互