Nacos源码之服务端AP架构集群节点的心跳检测
Posted morris131
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码之服务端AP架构集群节点的心跳检测相关的知识,希望对你有一定的参考价值。
当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。
当Nacos服务端启动时怎么知道集群中有哪些节点?
在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。
com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager
public ServerMemberManager(ServletContext servletContext) throws Exception
this.serverList = new ConcurrentSkipListMap<>();
EnvUtil.setContextPath(servletContext.getContextPath());
init();
protected void init() throws NacosException
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIP() + ":" + port;
this.self = MemberUtil.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
serverList.put(self.getAddress(), self);
// register NodeChangeEvent publisher to NotifyManager
// 注册MembersChangeEvent事件
registerClusterEvent();
// Initializes the lookup mode
// 初始化节点
initAndStartLookup();
if (serverList.isEmpty())
throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
Loggers.CORE.info("The cluster resource is initialized");
ServerMemberManager#registerClusterEvent
注册MembersChangeEvent的Publisher。
监听IPChangeEvent事件。
com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent
private void registerClusterEvent()
// Register node change events
NotifyCenter.registerToPublisher(MembersChangeEvent.class,
EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// The address information of this node needs to be dynamically modified
// when registering the IP change of this node
NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>()
@Override
public void onEvent(InetUtils.IPChangeEvent event)
String newAddress = event.getNewIP() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
EnvUtil.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIP());
String oldAddress = event.getOldIP() + ":" + port;
// 维护服务列表
ServerMemberManager.this.serverList.remove(oldAddress);
ServerMemberManager.this.serverList.put(newAddress, self);
ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
ServerMemberManager.this.memberAddressInfos.add(newAddress);
@Override
public Class<? extends Event> subscribeType()
return InetUtils.IPChangeEvent.class;
);
ServerMemberManager#initAndStartLookup
com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup
private void initAndStartLookup() throws NacosException
this.lookup = LookupFactory.createLookUp(this);
/**
* @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start()
*/
this.lookup.start();
FileConfigMemberLookup#start
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start
public void start() throws NacosException
if (start.compareAndSet(false, true))
// 读取cluster.conf文件
readClusterConfFromDisk();
// Use the inotify mechanism to monitor file changes and automatically
// trigger the reading of cluster.conf
try
// 监听文件的变化
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
catch (Throwable e)
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : ", e.getMessage());
FileConfigMemberLookup#readClusterConfFromDisk
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk
private void readClusterConfFromDisk()
Collection<Member> tmpMembers = new ArrayList<>();
try
List<String> tmp = EnvUtil.readClusterConf();
// 读取cluster.conf文件
tmpMembers = MemberUtil.readServerConf(tmp);
catch (Throwable e)
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : ", e.getMessage());
/**
* 发布MembersChangeEvent事件
*/
afterLookup(tmpMembers);
AbstractMemberLookup#afterLookup
com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup
public void afterLookup(Collection<Member> members)
this.memberManager.memberChange(members);
ServerMemberManager#memberChange
com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange
synchronized boolean memberChange(Collection<Member> members)
if (members == null || members.isEmpty())
return false;
// 判断自己是否在集群中
boolean isContainSelfIp = members.stream()
.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
if (isContainSelfIp)
isInIpList = true;
else
isInIpList = false;
// 如果自己不在集群中,把自己加入
members.add(this.self);
Loggers.CLUSTER.warn("[serverlist] self ip not in serverlist ", self, members);
// If the number of old and new clusters is different, the cluster information
// must have changed; if the number of clusters is the same, then compare whether
// there is a difference; if there is a difference, then the cluster node changes
// are involved and all recipients need to be notified of the node change event
// 判断集群的状态是否已变更
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members)
final String address = member.getAddress();
if (!serverList.containsKey(address))
hasChange = true;
// If the cluster information in cluster.conf or address-server has been changed,
// while the corresponding nacos-server has not been started yet, the member's state
// should be set to DOWN. If the corresponding nacos-server has been started, the
// member's state will be set to UP after detection in a few seconds.
member.setState(NodeState.DOWN);
else
//fix issue # 4925
member.setState(serverList.get(address).getState());
// Ensure that the node is created only once
tmpMap.put(address, member);
if (NodeState.UP.equals(member.getState()))
tmpAddressInfo.add(address);
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
Loggers.CLUSTER.warn("[serverlist] updated to : ", finalMembers);
// Persist the current cluster node information to cluster.conf
// <important> need to put the event publication into a synchronized block to ensure
// that the event publication is sequential
if (hasChange)
MemberUtil.syncToFile(finalMembers);
// 发布MembersChangeEvent事件
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
return hasChange;
集群间的节点怎么维持心跳?
ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。
com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event)
getSelf().setState(NodeState.UP);
if (!EnvUtil.getStandaloneMode())
// 发送服务节点之间的心跳包
GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
EnvUtil.setPort(event.getWebServer().getPort());
EnvUtil.setLocalAddress(this.localAddress);
Loggers.CLUSTER.info("This node is ready to provide external services");
下面分析MemberInfoReportTask任务的执行过程。
Task#run
MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。
com.alibaba.nacos.core.cluster.Task#run
public void run()
if (shutdown)
return;
try
executeBody();
catch (Throwable t)
Loggers.CORE.error("this task execute has error : ", ExceptionUtil.getStackTrace(t));
finally
if (!shutdown)
after();
MemberInfoReportTask#executeBody
遍历集群中的所有的节点,给每个节点发送心跳包。
com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody
protected void executeBody()
// 获取除自己外的所有节点
List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
if (members.isEmpty())
return;
// 每次+1
this.cursor = (this.cursor + 1) % members.size();
Member target = members.get(cursor);
Loggers.CLUSTER.debug("report the metadata to the node : ", target.getAddress());
// /nacos/v1/core/cluster/report
final String url = HttpUtils
.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
"/cluster/report");
try
Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);
AuthHeaderUtil.addIdentityToHeader(header);
asyncRestTemplate
.post(url, header,
Query.EMPTY, getSelf(), reference.getType(), new Callback<String>()
@Override
public void onReceive(RestResult<String> result)
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
|| result.getCode() == HttpStatus.NOT_FOUND.value())
Loggers.CLUSTER
.warn(" version is too low, it is recommended to upgrade the version : ",
target, VersionUtils.version);
return;
if (result.ok())
// 成功
MemberUtil.onSuccess(ServerMemberManager.this, target);
else
Loggers.CLUSTER
.warn("failed to report new info to target node : , result : ",
target.getAddress(), result);
// 失败
MemberUtil.onFail(ServerMemberManager.this, target);
@Override
public void onError(Throwable throwable)
Loggers.CLUSTER
.error("failed to report new info to target node : , error : ",
target.getAddress(),
ExceptionUtil.getAllExceptionMsg(throwable));
// 失败
MemberUtil.onFail(ServerMemberManager.this, target, throwable);
@Override
public void onCancel()
);
catch (Throwable ex)
Loggers.CLUSTER.error("failed to report new info to target node : , error : ", target.getAddress(),
ExceptionUtil.getAllExceptionMsg(ex));
MemberUtil#onSuccess
com.alibaba.nacos.core.cluster.MemberUtil#onSuccess
public static void onSuccess(final ServerMemberManager manager, final Member member)
final NodeState old = member.getState();
manager.getMemberAddressInfos().add(member.getAddress());
// 将节点状态改为UP
member.setState(NodeState.UP);
member.setFailAccessCnt(0);
if (!Objects.equals(old, member.getState()))
// 发布MembersChangeEvent事件
manager.notifyMemberChange();
MemberUtil#onFail
com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex)
manager.getMemberAddressInfos().remove(member.getAddress());
final NodeState old = member.getState();
// 将节点状态改为SUSPICIOUS
member.setState(NodeState.SUSPICIOUS);
member.Nacos源码之服务端AP架构集群节点数据的同步
当Nacos集群部署时,临时实例数据在集群之间是如何进行同步的?
Nacos针对临时实例数据在集群之间的同步开发了Distro一致性协议,Distro一致性协议是弱一致性协议,用来保证Nacos注册中心的可用性,当临时实例注册到Nacos注册中心时,集群的实例数据并不是一致的,当通过Distro协议同步之后才最终达到一致性,所以Distro协议保证了Nacos注册中心的AP(可用性)。
新节点同步实例数据
如果nacos集群中有新的节点加入,新节点启动时就会从其他节点进行全量拉取数据。当DistroProtocol初始化时,调用startDistroTask方法进行全量拉取数据:
com.alibaba.nacos.core.distributed.distro.DistroProtocol#startDistroTask
private void startDistroTask()
if (EnvUtil.getStandaloneMode())
isInitialized = true;
return;
/**
* 开启数据校验任务
*/
startVerifyTask();
/**
* 开启加载数据任务
*/
startLoadTask();
开启加载全量数据的任务,提交了一个异步任务DistroLoadDataTask。
com.alibaba.nacos.core.distributed.distro.DistroProtocol#startLoadTask
private void startLoadTask()
DistroCallback loadCallback = new DistroCallback()
@Override
public void onSuccess()
isInitialized = true;
@Override
public void onFailed(Throwable throwable)
isInitialized = false;
;
// 立即执行
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
下面来看DistroLoadDataTask的run()方法。run方法使用load方法加载从远程加载全量数据,如果检测到加载数据没有完成,则继续提交全量拉取数据的任务,否则进行任务的成功回调。如果加载数据发生了异常,则进行任务的失败回调。
com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#run
public void run()
try
// 加载数据
load();
if (!checkCompleted())
// 加载不成功,延迟加载数据
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
else
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
catch (Exception e)
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#load
private void load() throws Exception
// 在服务启动的时候,是没有其他远程服务的地址的,如果服务地址都是空的,则进行等待,直到服务地址不为空。
while (memberManager.allMembersWithoutSelf().isEmpty())
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
// 接着判断数据存储类型是否为空,如果为空,则进行等待,直到服务地址不为空。
while (distroComponentHolder.getDataStorageTypes().isEmpty())
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
// 遍历所有的数据存储类型,判断loadCompletedMap是否存在数据存储类型和该类型的数据是否已经加载完成,
// 如果没有则调用loadAllDataSnapshotFromRemote进行全量数据的加载:
for (String each : distroComponentHolder.getDataStorageTypes())
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each))
// 没完成的继续加载数据
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
loadAllDataSnapshotFromRemote()负责从远程服务器拉取数据。
com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#loadAllDataSnapshotFromRemote
private boolean loadAllDataSnapshotFromRemote(String resourceType)
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor)
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type , transportAgent: , dataProcessor: ",
resourceType, transportAgent, dataProcessor);
return false;
// 遍历所有的远程服务地址,除了自己
for (Member each : memberManager.allMembersWithoutSelf())
try
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot from ", resourceType, each.getAddress());
/**
* 从远程获取所有的数据
*/
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 处理数据
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot from result: ", resourceType, each.getAddress(),
result);
if (result)
return true;
catch (Exception e)
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot from failed.", resourceType, each.getAddress(), e);
return false;
loadAllDataSnapshotFromRemote方法做了两件事:
- 通过http请求拉取远程服务的所有全量数据:拉取数据的接口为:/distro/v1/ns/distro/datums
- 处理拉取回来的全量数据
处理全量数据的方法为processData():
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#processData(byte[])
private boolean processData(byte[] data) throws Exception
if (data.length > 0)
// 反序列化数据
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet())
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey()))
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral())
// create empty service
Loggers.DISTRO.info("creating service ", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
// The Listener corresponding to the key value must not be empty
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
if (Objects.isNull(listener))
return false;
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet())
if (!listeners.containsKey(entry.getKey()))
// Should not happen:
Loggers.DISTRO.warn("listener of not found.", entry.getKey());
continue;
try
for (RecordListener listener : listeners.get(entry.getKey()))
/**
* @see Service#onChange(java.lang.String, com.alibaba.nacos.naming.core.Instances)
*/
listener.onChange(entry.getKey(), entry.getValue().value);
catch (Exception e)
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: ", entry.getKey(), e);
continue;
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
return true;
最后会调用到Service.onChange()方法,与实例的注册一样调用此方法更新注册表。
再来看看远程服务是如何处理全量拉取数据的请求的:
com.alibaba.nacos.naming.controllers.DistroController#getAllDatums
@GetMapping("/datums")
public ResponseEntity getAllDatums()
// 服务端查询所有实例数据的入口
DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return ResponseEntity.ok(distroData.getContent());
com.alibaba.nacos.core.distributed.distro.DistroProtocol#onSnapshot
public DistroData onSnapshot(String type)
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage)
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key ", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
// 查询所有的实例数据
return distroDataStorage.getDatumSnapshot();
com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroDataStorageImpl#getDatumSnapshot
public DistroData getDatumSnapshot()
// 从缓存中获取所有的实例数据
Map<String, Datum> result = dataStore.getDataMap();
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, dataContent);
全量数据拉取无非就是从内存dataStore中获取所有临时实例的数据,并且对数据进行序列化,然后返回给客户端。
数据校验任务
Nacos AP集群为保证数据的最终一致性会开启一个数据校验的定时任务来检查各个节点之间的数据是否一致,不一致就会进行数据的同步更新。
数据校验任务DistroVerifyTask与同步全量数据任务DistroLoadDataTask同样是在DistroProtocol实例化是创建的。
com.alibaba.nacos.core.distributed.distro.DistroProtocol#startDistroTask
private void startDistroTask()
if (EnvUtil.getStandaloneMode())
isInitialized = true;
return;
/**
* 开启数据校验任务
*/
startVerifyTask();
/**
* 开启加载数据任务
*/
startLoadTask();
private void startVerifyTask()
// 5s执行一次数据校验任务
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
数据校验任务默认5s执行一次,会将缓存中所有的key调用接口/nacos/v1/ns/distro/checksum发送给远程服务器。
com.alibaba.nacos.core.distributed.distro.task.verify.DistroVerifyTask#run
public void run()
try
// 获取集群中的其他节点
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled())
Loggers.DISTRO.debug("server list is: ", targetServer);
for (String each : distroComponentHolder.getDataStorageTypes())
// 校验数据
verifyForDataStorage(each, targetServer);
catch (Exception e)
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
private void verifyForDataStorage(String type, List<Member> targetServer)
/**
* @see com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroDataStorageImpl#getVerifyData()
*/
DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
if (null == distroData)
return;
distroData.setType(DataOperation.VERIFY);
// 遍历集群中的其他节点
for (Member member : targetServer)
try
// 发送数据校验的请求
distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
catch (Exception e)
Loggers.DISTRO.error(String
.format("[DISTRO-FAILED] verify data for type %s to %s failed.", type, member.getAddress()), e);
接下来看远程服务器端接受到数据校验任务的请求时是怎么处理的:
com.alibaba.nacos.naming.controllers.DistroController#syncChecksum
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap)
// 收到校验数据的请求
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
// 开始校验
distroProtocol.onVerify(distroHttpData);
return ResponseEntity.ok("ok");
com.alibaba.nacos.core.distributed.distro.DistroProtocol#onVerify
public boolean onVerify(DistroData distroData)
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor)
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data ", resourceType);
return false;
/**
* 处理校验数据
*/
return dataProcessor.processVerifyData(distroData);
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#processVerifyData
public boolean processVerifyData(DistroData distroData)
DistroHttpData distroHttpData = (DistroHttpData) distroData;
String sourceServer = distroData.getDistroKey().getResourceKey();
Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
// 校验数据
onReceiveChecksums(verifyData, sourceServer);
return true;
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onReceiveChecksums
public void onReceiveChecksums(Map<String, String> checksumMap, String server)
if (syncChecksumTasks.containsKey(server))
// Already in以上是关于Nacos源码之服务端AP架构集群节点的心跳检测的主要内容,如果未能解决你的问题,请参考以下文章