RocketMQ源码系列 NameServer 核心源码解析
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 NameServer 核心源码解析相关的知识,希望对你有一定的参考价值。
目录
rocketmq版本: 4.8.0
一、NameServer 介绍
NameServer 是rocketmq核心组件之一,与zookeeper一样天生具有分布式的特性,在rocketmq中担当着路由注册、发现、动态地维护broker相关信息的角色, NameServer 不提供Master-slave同步机制,但是能够保证数据的最终一致性。
二、NameServer 功能列表
- 动态路由发现和注册功能。broker 启动时,会将brokerAddr 注册到NameServer里, 路由发现是指客户端会定时的向NameServer根据topic拉取路由的最新信息。
- 动态剔除功能。每隔10 s NameServer 会自动扫描所有的broker, 如果有broker失效,那么会从地址列表里将其剔除掉。
三、NameServer 架构分析
下面是 rocketmq 的部署图
核心原理解析
Broker消息服务器启动时会自动向NameServer 注册信息,消息生产者在发送消息时,会在NameServer的地址列表里通过负载均衡选择一个Broker进行消息发送。 NameServer 与每台broker保持长连接,broker会每隔30s向NameServer发送一个心跳包,NameServer每间隔10s查看broker是否存活,如果broker挂掉了,判断挂掉的逻辑是brokerLiveTable检测上次的心跳包与当前系统时间的时间差,如果时间戳大于120s, 那么就将broker从服务地址列表里剔除。
这样设计的目的是降低NameServer 的复杂性, 在消息发送端提供容错机制来保证消息发送的高可用性。
NameServer 可以通过集群来保证高可用性,但在同一时刻有可能获取到数据是不一致的,因为不提供同步机制,但能够保证多个节点的最终一致性。NameServer 这样设计是为了简单高效。
四、NameServer 工程目录解析
工程目录结构以及解析如下:
namesrv
├─ NamesrvController.java // 执行初始化逻辑,加载配置、注册Processor等
├─ NamesrvStartup.java // NameServer的启动类, 启动netty server
├─ kvconfig
│ ├─ KVConfigManager.java // namespace和config配置管理
│ └─ KVConfigSerializeWrapper.java // 将获取到的配置json序列化
├─ processor
│ ├─ ClusterTestRequestProcessor.java //处理请求类型。
│ └─ DefaultRequestProcessor.java // 默认地请求处理器, 处理数据包
└─ routeinfo
├─ BrokerHousekeepingService.java // 管理netty 的channel
└─ RouteInfoManager.java // 路由管理器,维护topic, broker,
//clusterName, brokerAddr等信息
通过简单地分析可以发现netty 是rocketmq 网络通信的核心,掌握netty 的常见用法是非常有必要的。
五、NameServer 启动流程分析
1) 加载配置
加载 namesrvConfig 和 nettyServerConfig, 如果有手动配置也可以生效, 使用option类封装参数,在程序运行前添加配置Program arguments, 添加的格式: 例如 -c , -p 等。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
....
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
2) initialize()
NamesrvController 在执行start()方法前需要做一些准备工作,比如加载配置、创建Netty Server实例、注册请求处理器、扫描所有的失联的broker等
具体的解释如下注释:
public boolean initialize() {
// 加载k,v 相关配置,含自定义配置。
this.kvConfigManager.load();
// 启动netty server, 管理channel
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化netty 线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册netty 请求Handler, 可以通过NettyRequestProcessor接口找到其实现类
this.registerProcessor();
// 与broker建立长连接,扫描所有的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 打印所有的config
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 监听文件里的配置是否修改
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
如果initialize()方法返回false, 那么需要检查一些相关配置是否正确, 返回true后,就可以执行最后一步controller.start()方法, 该方法表示NameServer正式启动。
3) 启动server
接下来看下源代码分析start()方法做了哪些事
public void start() throws Exception {
// 1. 启动netty server
this.remotingServer.start();
// 2. 启动文件扫描线程,监听核心配置是否修改。
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
可以通过debug发现,首先会进入到NettyRemotingServer类里的start()方法, 该方法实现了nettyServer, 初始化netty的线程组和实例化 ServerBootStrap。
然后开启一个线程执行FileWatchService 的run()方法:
启动成功后,会在控制台打印 boot success的字样。
六、NameServer核心源码解析
1. 路由注册
1) broker向NameServer 发送心跳包
找到brokerController的start()方法里,broker 通过 BrokerController.this.
registerBrokerAll(true,false) 方法来向NameServer 发送心跳包,其中使用定时任务 sheduledExecutorService 线程池定时发送,每隔30s 发送一次, brokerConfig.getRegisterNameServerPeriod() 的默认值为30s。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
然后进入到doRegisterBrokerAll()方法,找到BrokerOuterApi里的registerBrokerAll()方法, 通过RegiterBrokerRequestHeader类封装broker相关的信息, RegiterBrokerRequestHeader 主要属性如下:
- brokerName: broker名称。
- brokerAddr: broker的地址。
- cluterName: broker所在集群的名称。
- haServerAddr: 集群master的地址。
- brokerId: brokerId为0的时候表示该broker为master, 如果大于0,表示该broker为slave。
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 封装broker信息
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 等待所有的NameServer都含有broker信息后,才表示执行完毕。
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 把该broker的信息注册到所有的NameServer上。
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
// 默认超时时间为6s, 在BrokerConfig里配有registerBrokerTimeoutMills=6000
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
2) NameServer 处理心跳包
首先DefaultRequestProcessor 网络处理器解析请求类型,请求类型如果为RequestCode.
REGISTER_BROKER, 则最终的请求会到RouteInfoManager里的registerBroker()方法。
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
// 解析数据包
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
// 用RouteInfoManager 注册broker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 响应broker
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
RouteInfoManager 里的registerBroker方法将broker的信息最终添加到 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable里。
2. 路由删除
RouteInfoManager 的scanNotActiveBroker ()方法, 采用了单线程定时线程池每隔10s扫描所有broker的策略, 该方法在NamesrvController里的initialize()方法里, newSingleThreadScheduledExecutor线程池里只有一个线程实例,利用此线程池能极大地减少系统资源地开销,因为扫描broker本身不需要过多的资源,开启一个线程足以。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
NameServer是如何判定broker失效的呢?
继续跟着源码进入到scanNotActiveBroker()方法, 判定失效的逻辑是: 如果当前时间戳- 上一次更新的时间戳> 120s。那么判断该broker是失效的。 BROKER_CHANNEL_EXPIRED_TIME默认值为120s。因为broker每隔30s会给NameServer发送一次心跳信息,因此此方式可以判定broker是否失效。
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
接着会将broker相关的信息从brokerLiveTable中移除掉,同时销毁掉netty对应的channel。brokerLiveTable是一个hashmap,归RouteInfoManager类持有。
3. 路由发现
RocketMQ的路由发现是非实时的,当Topic路由发生变化时,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题拉取最新路由的编码为: GET_ROUTEINFO_BY_TOPIC。
我们可以找到DefaultRequestProcessor处理器里的processRequest()方法,该方法用来处理Netty请求, 该方法有个判断,当request里的code为GET_ROUTEINFO_BY_TOPIC时,执行this.getRouteInfoByTopic(ctx, request)方法。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
// 根据请求代码code来判断业务逻辑
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
// 注册broker
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// 移除broker
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
// 根据topic获取路由
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
然后getRouteInfoByTopic(ctx,request)方法里主要做了两件事:
1) 根据topic从RouteInfoManager的topicQueueTable里获取到所有的queue, 即List<QueueData> , 从brokerAddrTable里获取到所有的broker, 即List<BrokerData>。
2) 判断指定的topic是否是顺序消息的topic,如果是那么给返回配置顺序消息的路由。
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
// 1. 根据topic获取指定的broker和queue
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
// 2. 判断是否是顺序消息的topic, 如果是顺序消息,那么给该请求返回顺序消息的路由
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
以上是关于RocketMQ源码系列 NameServer 核心源码解析的主要内容,如果未能解决你的问题,请参考以下文章