RocketMQ源码系列 NameServer 核心源码解析
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 NameServer 核心源码解析相关的知识,希望对你有一定的参考价值。
目录
rocketmq版本: 4.8.0
一、NameServer 介绍
NameServer 是rocketmq核心组件之一,与zookeeper一样天生具有分布式的特性,在rocketmq中担当着路由注册、发现、动态地维护broker相关信息的角色, NameServer 不提供Master-slave同步机制,但是能够保证数据的最终一致性。
二、NameServer 功能列表
- 动态路由发现和注册功能。broker 启动时,会将brokerAddr 注册到NameServer里, 路由发现是指客户端会定时的向NameServer根据topic拉取路由的最新信息。
- 动态剔除功能。每隔10 s NameServer 会自动扫描所有的broker, 如果有broker失效,那么会从地址列表里将其剔除掉。
三、NameServer 架构分析
下面是 rocketmq 的部署图
核心原理解析
Broker消息服务器启动时会自动向NameServer 注册信息,消息生产者在发送消息时,会在NameServer的地址列表里通过负载均衡选择一个Broker进行消息发送。 NameServer 与每台broker保持长连接,broker会每隔30s向NameServer发送一个心跳包,NameServer每间隔10s查看broker是否存活,如果broker挂掉了,判断挂掉的逻辑是brokerLiveTable检测上次的心跳包与当前系统时间的时间差,如果时间戳大于120s, 那么就将broker从服务地址列表里剔除。
这样设计的目的是降低NameServer 的复杂性, 在消息发送端提供容错机制来保证消息发送的高可用性。
NameServer 可以通过集群来保证高可用性,但在同一时刻有可能获取到数据是不一致的,因为不提供同步机制,但能够保证多个节点的最终一致性。NameServer 这样设计是为了简单高效。
四、NameServer 工程目录解析
工程目录结构以及解析如下:
namesrv
├─ NamesrvController.java // 执行初始化逻辑,加载配置、注册Processor等
├─ NamesrvStartup.java // NameServer的启动类, 启动netty server
├─ kvconfig
│ ├─ KVConfigManager.java // namespace和config配置管理
│ └─ KVConfigSerializeWrapper.java // 将获取到的配置json序列化
├─ processor
│ ├─ ClusterTestRequestProcessor.java //处理请求类型。
│ └─ DefaultRequestProcessor.java // 默认地请求处理器, 处理数据包
└─ routeinfo
├─ BrokerHousekeepingService.java // netty 的channel共用方法抽象
└─ RouteInfoManager.java // 路由管理器,维护topic, broker,
//clusterName, brokerAddr等信息
分析发现netty 是rocketmq 网络通信的核心,掌握netty 的常见用法是非常有必要的。
五、NameServer 启动流程分析
1) 创建NameSrvController
加载 namesrvConfig 和 nettyServerConfig, 如果有手动配置也可以生效, 使用option类封装参数,在程序运行前添加配置Program arguments, 添加的格式: 例如 -c , -p 等。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException
....
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
NameSrv的配置存放在 user.home\\namesrv\\ 目录下:
2) 执行initialize()加载需要的配置
NamesrvController 在执行start()方法前需要做一些准备工作,比如加载配置、创建Netty Server实例、注册请求处理器、扫描所有的失联的broker等
具体的解释如下注释:
public boolean initialize()
// 加载k,v 相关配置,含自定义配置。
this.kvConfigManager.load();
// 启动netty server, 管理channel
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化netty 线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册netty 请求Handler, 可以通过NettyRequestProcessor接口找到其实现类
this.registerProcessor();
// 与broker建立长连接,扫描所有的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
, 5, 10, TimeUnit.SECONDS);
// 打印所有的config
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
NamesrvController.this.kvConfigManager.printAllPeriodically();
, 1, 10, TimeUnit.MINUTES);
// 监听文件里的配置是否修改
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED)
// Register a listener to reload SslContext
try
fileWatchService = new FileWatchService(
new String[]
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
,
new FileWatchService.Listener()
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path)
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath))
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
if (path.equals(TlsSystemConfig.tlsServerCertPath))
certChanged = true;
if (path.equals(TlsSystemConfig.tlsServerKeyPath))
keyChanged = true;
if (certChanged && keyChanged)
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
private void reloadServerSslContext()
((NettyRemotingServer) remotingServer).loadSslContext();
);
catch (Exception e)
log.warn("FileWatchService created error, can't load the certificate dynamically");
return true;
如果initialize()方法返回false, 那么需要检查一些相关配置是否正确, 返回true后,就可以执行最后一步controller.start()方法, 该方法表示NameServer正式启动。
3) 启动server
接下来看下源代码分析start()方法做了哪些事
public void start() throws Exception
// 1. 启动netty server
this.remotingServer.start();
// 2. 启动文件扫描线程,监听核心配置是否修改。
if (this.fileWatchService != null)
this.fileWatchService.start();
可以通过debug发现,首先会进入到NettyRemotingServer类里的start()方法, 该方法实现了nettyServer, 初始化netty的线程组和实例化 ServerBootStrap。
然后开启一个线程执行FileWatchService 的run()方法:
通过此线程扫描配置文件是否被修改, NameServer启动成功后,会在控制台打印 boot success的字样。
六、NameServer核心源码解析
1. 路由注册
1) broker向NameServer 发送心跳包
找到brokerController的start()方法里,broker 通过 BrokerController.this.registerBrokerAll(true,false) 方法来向NameServer 发送心跳包,其中使用定时任务 sheduledExecutorService 线程池定时发送,每隔30s 发送一次, brokerConfig.getRegisterNameServerPeriod() 的默认值为30s。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
catch (Throwable e)
log.error("registerBrokerAll Exception", e);
, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
然后进入到doRegisterBrokerAll()方法,找到BrokerOuterApi里的registerBrokerAll()方法, 用RegiterBrokerRequestHeader类封装broker相关的信息, RegiterBrokerRequestHeader 主要属性如下:
- brokerName: broker名称。
- brokerAddr: broker的地址。
- cluterName: broker所在集群的名称。
- haServerAddr: 集群master的地址。
- brokerId: brokerId为0的时候表示该broker为master, 如果大于0,表示该broker为slave。
brokerId=0为master节点的配置在MixALL配置中:
然后会调用到registerBrokerAll() 方法, 最终会将该broker信息注册到所有的NameServer上。
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed)
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
// 封装broker信息
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 等待所有的NameServer都含有broker信息后,才表示执行完毕。
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 把该broker的信息注册到所有的NameServer上。
for (final String namesrvAddr : nameServerAddressList)
brokerOuterExecutor.execute(new Runnable()
@Override
public void run()
try
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null)
registerBrokerResultList.add(result);
log.info("register broker[]to name server OK", brokerId, namesrvAddr);
catch (Exception e)
log.warn("registerBroker Exception, ", namesrvAddr, e);
finally
countDownLatch.countDown();
);
try
// 默认超时时间为6s, 在BrokerConfig里配有registerBrokerTimeoutMills=6000
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
catch (InterruptedException e)
return registerBrokerResultList;
而broker相关信息是通过netty 发送给NameServer, broker信息的请求注册方式有oneway 和同步和异步,默认发送注册请求的方式是同步的。
可以在BrokerOuterAPI类里的registerBroker(final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body) 里找到通过同步的方式发送注册请求,同步的注册方式如下:
2) NameServer 处理心跳包
NameServer接收到broker发送过来的请求后,首先会在DefaultRequestProcessor 网络处理器解析请求类型,请求类型如果为RequestCode.REGISTER_BROKER, 则最终的请求会到RouteInfoManager里的registerBroker()方法。
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader))
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
// 解析数据包
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null)
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
else
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
// 用RouteInfoManager 注册broker
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 响应broker
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
RouteInfoManager 里的registerBroker方法将broker的信息最终添加到 clusterAddrTable、brokerAddrTable、brokerLiveTable、filterServerTable里。
画了一下broker的在NameSrv中的注册流程图:
2. 路由删除
RouteInfoManager 的scanNotActiveBroker ()方法, 采用了单线程定时线程池每隔10s扫描所有broker的策略, 该方法在NamesrvController里的initialize()方法里, newSingleThreadScheduledExecutor线程池里只有一个线程实例,利用此线程池能极大地减少系统资源地开销,因为扫描broker本身不需要过多的资源,开启一个线程足以。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
, 5, 10, TimeUnit.SECONDS);
NameServer是如何判定broker失效的呢?
继续跟着源码进入到scanNotActiveBroker()方法, 判定失效的逻辑是: 如果当前时间戳- 上一次更新的时间戳> 120s。那么判断该broker是失效的。 BROKER_CHANNEL_EXPIRED_TIME默认值为120s。因为broker每隔30s会给NameServer发送一次心跳信息,因此此方式可以判定broker是否失效。
public void scanNotActiveBroker()
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext())
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis())
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
接着会将broker相关的信息从brokerLiveTable中移除掉,同时销毁掉netty对应的channel。brokerLiveTable是一个hashmap,归RouteInfoManager类持有。
3. 路由发现
RocketMQ的路由发现是非实时的,当Topic路由发生变化时,NameServer不主动推送给客户端,而是由客户端定时拉取主题最新的路由。根据主题拉取最新路由的编码为: GET_ROUTEINFO_BY_TOPIC。
我们可以找到DefaultRequestProcessor处理器里的processRequest()方法,该方法用来处理Netty请求, 该方法有个判断,当request里的code为GET_ROUTEINFO_BY_TOPIC时,执行this.getRouteInfoByTopic(ctx, request)方法。
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
if (ctx != null)
log.debug("receive request, ",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
// 根据请求代码code来判断业务逻辑
switch (request.getCode())
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
// 注册broker
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal())
return this.registerBrokerWithFilterServer(ctx, request);
else
return this.registerBroker(ctx, request);
// 移除broker
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
// 根据topic获取路由
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
return null;
然后DefaultRequestProceesor类里的getRouteInfoByTopic(ctx,request)方法里主要做了两件事:
1) 根据topic从RouteInfoManager的topicQueueTable里获取到所有的QueueData和BrokerData, 然后将他们设置到topicRouteData里返回出去。
2) 判断指定的topic是否是顺序消息的topic,如果是那么给返回配置顺序消息的路由, 即给setOrderTopicConf赋值。
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
// 1. 获取到topicRouteData,包含topic所有的QueueData和BrokerData
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null)
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable())
// 2. 判断是否是顺序消息的topic, 如果是顺序消息,那么给该请求返回顺序消息的路由
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
小结
1. NameServer相当于rocketmq的注册中心,能够维护并实时监听broker的地址信息和队列信息等。
2. NameServer和broker之间是基于netty通信的。
3. DefaultRequestProcessor的 getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request)方法是根据topic获取到路由信息(包含topic对应的所有queue和所有的broker)、registBroker() 方法将broker信息注册到NameServer上、unregisterBroker()方法移除NameServer上的broker信息。
4. RouteInfoManager 类管理了所有的broker、cluster集群、topicQueue主题队列以及broker存活的信息。
以上是关于RocketMQ源码系列 NameServer 核心源码解析的主要内容,如果未能解决你的问题,请参考以下文章