Nacos源码之服务端AP架构集群节点的心跳检测

Posted morris131

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码之服务端AP架构集群节点的心跳检测相关的知识,希望对你有一定的参考价值。

当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。

当Nacos服务端启动时怎么知道集群中有哪些节点?

在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。

com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager

public ServerMemberManager(ServletContext servletContext) throws Exception 
	this.serverList = new ConcurrentSkipListMap<>();
	EnvUtil.setContextPath(servletContext.getContextPath());

	init();


protected void init() throws NacosException 
	Loggers.CORE.info("Nacos-related cluster resource initialization");
	this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
	this.localAddress = InetUtils.getSelfIP() + ":" + port;
	this.self = MemberUtil.singleParse(this.localAddress);
	this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
	serverList.put(self.getAddress(), self);

	// register NodeChangeEvent publisher to NotifyManager
	// 注册MembersChangeEvent事件
	registerClusterEvent();

	// Initializes the lookup mode
	// 初始化节点
	initAndStartLookup();

	if (serverList.isEmpty()) 
		throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
	

	Loggers.CORE.info("The cluster resource is initialized");

ServerMemberManager#registerClusterEvent

注册MembersChangeEvent的Publisher。

监听IPChangeEvent事件。

com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent

private void registerClusterEvent() 
	// Register node change events
	NotifyCenter.registerToPublisher(MembersChangeEvent.class,
									 EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));

	// The address information of this node needs to be dynamically modified
	// when registering the IP change of this node
	NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() 
		@Override
		public void onEvent(InetUtils.IPChangeEvent event) 
			String newAddress = event.getNewIP() + ":" + port;
			ServerMemberManager.this.localAddress = newAddress;
			EnvUtil.setLocalAddress(localAddress);

			Member self = ServerMemberManager.this.self;
			self.setIp(event.getNewIP());

			String oldAddress = event.getOldIP() + ":" + port;
			// 维护服务列表
			ServerMemberManager.this.serverList.remove(oldAddress);
			ServerMemberManager.this.serverList.put(newAddress, self);

			ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
			ServerMemberManager.this.memberAddressInfos.add(newAddress);
		

		@Override
		public Class<? extends Event> subscribeType() 
			return InetUtils.IPChangeEvent.class;
		
	);

ServerMemberManager#initAndStartLookup

com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup

private void initAndStartLookup() throws NacosException 
	this.lookup = LookupFactory.createLookUp(this);
	/**
     * @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start()
     */
	this.lookup.start();

FileConfigMemberLookup#start

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start

public void start() throws NacosException 
	if (start.compareAndSet(false, true)) 
		// 读取cluster.conf文件
		readClusterConfFromDisk();

		// Use the inotify mechanism to monitor file changes and automatically
		// trigger the reading of cluster.conf
		try 
			// 监听文件的变化
			WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
		 catch (Throwable e) 
			Loggers.CLUSTER.error("An exception occurred in the launch file monitor : ", e.getMessage());
		
	

FileConfigMemberLookup#readClusterConfFromDisk

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk

private void readClusterConfFromDisk() 
	Collection<Member> tmpMembers = new ArrayList<>();
	try 
		List<String> tmp = EnvUtil.readClusterConf();
		// 读取cluster.conf文件
		tmpMembers = MemberUtil.readServerConf(tmp);
	 catch (Throwable e) 
		Loggers.CLUSTER
			.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : ", e.getMessage());
	

	/**
         * 发布MembersChangeEvent事件
         */
	afterLookup(tmpMembers);

AbstractMemberLookup#afterLookup

com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup

public void afterLookup(Collection<Member> members) 
	this.memberManager.memberChange(members);

ServerMemberManager#memberChange

com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange

synchronized boolean memberChange(Collection<Member> members) 

	if (members == null || members.isEmpty()) 
		return false;
	

	// 判断自己是否在集群中
	boolean isContainSelfIp = members.stream()
		.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));

	if (isContainSelfIp) 
		isInIpList = true;
	 else 
		isInIpList = false;
		// 如果自己不在集群中,把自己加入
		members.add(this.self);
		Loggers.CLUSTER.warn("[serverlist] self ip  not in serverlist ", self, members);
	

	// If the number of old and new clusters is different, the cluster information
	// must have changed; if the number of clusters is the same, then compare whether
	// there is a difference; if there is a difference, then the cluster node changes
	// are involved and all recipients need to be notified of the node change event

	// 判断集群的状态是否已变更
	boolean hasChange = members.size() != serverList.size();
	ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
	Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
	for (Member member : members) 
		final String address = member.getAddress();

		if (!serverList.containsKey(address)) 
			hasChange = true;
			// If the cluster information in cluster.conf or address-server has been changed,
			// while the corresponding nacos-server has not been started yet, the member's state
			// should be set to DOWN. If the corresponding nacos-server has been started, the
			// member's state will be set to UP after detection in a few seconds.
			member.setState(NodeState.DOWN);
		 else 
			//fix issue # 4925
			member.setState(serverList.get(address).getState());
		

		// Ensure that the node is created only once
		tmpMap.put(address, member);
		if (NodeState.UP.equals(member.getState())) 
			tmpAddressInfo.add(address);
		
	

	serverList = tmpMap;
	memberAddressInfos = tmpAddressInfo;

	Collection<Member> finalMembers = allMembers();

	Loggers.CLUSTER.warn("[serverlist] updated to : ", finalMembers);

	// Persist the current cluster node information to cluster.conf
	// <important> need to put the event publication into a synchronized block to ensure
	// that the event publication is sequential
	if (hasChange) 
		MemberUtil.syncToFile(finalMembers);
		// 发布MembersChangeEvent事件
		Event event = MembersChangeEvent.builder().members(finalMembers).build();
		NotifyCenter.publishEvent(event);
	

	return hasChange;

集群间的节点怎么维持心跳?

ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。

com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent

public void onApplicationEvent(WebServerInitializedEvent event) 
	getSelf().setState(NodeState.UP);
	if (!EnvUtil.getStandaloneMode()) 
		// 发送服务节点之间的心跳包
		GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
	
	EnvUtil.setPort(event.getWebServer().getPort());
	EnvUtil.setLocalAddress(this.localAddress);
	Loggers.CLUSTER.info("This node is ready to provide external services");

下面分析MemberInfoReportTask任务的执行过程。

Task#run

MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。

com.alibaba.nacos.core.cluster.Task#run

    public void run() 
        if (shutdown) 
            return;
        
        try 
            executeBody();
         catch (Throwable t) 
            Loggers.CORE.error("this task execute has error : ", ExceptionUtil.getStackTrace(t));
         finally 
            if (!shutdown) 
                after();
            
        
    

MemberInfoReportTask#executeBody

遍历集群中的所有的节点,给每个节点发送心跳包。

com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody

protected void executeBody() 
	// 获取除自己外的所有节点
	List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();

	if (members.isEmpty()) 
		return;
	

	// 每次+1
	this.cursor = (this.cursor + 1) % members.size();
	Member target = members.get(cursor);

	Loggers.CLUSTER.debug("report the metadata to the node : ", target.getAddress());

	// /nacos/v1/core/cluster/report
	final String url = HttpUtils
		.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
				  "/cluster/report");

	try 
		Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);
		AuthHeaderUtil.addIdentityToHeader(header);
		asyncRestTemplate
			.post(url, header,
				  Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() 
					  @Override
					  public void onReceive(RestResult<String> result) 
						  if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
							  || result.getCode() == HttpStatus.NOT_FOUND.value()) 
							  Loggers.CLUSTER
								  .warn(" version is too low, it is recommended to upgrade the version : ",
										target, VersionUtils.version);
							  return;
						  
						  if (result.ok()) 
							  // 成功
							  MemberUtil.onSuccess(ServerMemberManager.this, target);
						   else 
							  Loggers.CLUSTER
								  .warn("failed to report new info to target node : , result : ",
										target.getAddress(), result);
							  // 失败
							  MemberUtil.onFail(ServerMemberManager.this, target);
						  
					  

					  @Override
					  public void onError(Throwable throwable) 
						  Loggers.CLUSTER
							  .error("failed to report new info to target node : , error : ",
									 target.getAddress(),
									 ExceptionUtil.getAllExceptionMsg(throwable));
						  // 失败
						  MemberUtil.onFail(ServerMemberManager.this, target, throwable);
					  

					  @Override
					  public void onCancel() 

					  
				  );
	 catch (Throwable ex) 
		Loggers.CLUSTER.error("failed to report new info to target node : , error : ", target.getAddress(),
							  ExceptionUtil.getAllExceptionMsg(ex));
	

MemberUtil#onSuccess

com.alibaba.nacos.core.cluster.MemberUtil#onSuccess

public static void onSuccess(final ServerMemberManager manager, final Member member) 
	final NodeState old = member.getState();
	manager.getMemberAddressInfos().add(member.getAddress());
	// 将节点状态改为UP
	member.setState(NodeState.UP);
	member.setFailAccessCnt(0);
	if (!Objects.equals(old, member.getState())) 
		// 发布MembersChangeEvent事件
		manager.notifyMemberChange();
	

MemberUtil#onFail

com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)

public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) 
	manager.getMemberAddressInfos().remove(member.getAddress());
	final NodeState old = member.getState();
	// 将节点状态改为SUSPICIOUS
	member.setState(NodeState.SUSPICIOUS);
	member.

Nacos源码之服务端AP架构集群节点数据的同步

当Nacos集群部署时,临时实例数据在集群之间是如何进行同步的?

Nacos针对临时实例数据在集群之间的同步开发了Distro一致性协议,Distro一致性协议是弱一致性协议,用来保证Nacos注册中心的可用性,当临时实例注册到Nacos注册中心时,集群的实例数据并不是一致的,当通过Distro协议同步之后才最终达到一致性,所以Distro协议保证了Nacos注册中心的AP(可用性)。

新节点同步实例数据

如果nacos集群中有新的节点加入,新节点启动时就会从其他节点进行全量拉取数据。当DistroProtocol初始化时,调用startDistroTask方法进行全量拉取数据:

com.alibaba.nacos.core.distributed.distro.DistroProtocol#startDistroTask

private void startDistroTask() 
	if (EnvUtil.getStandaloneMode()) 
		isInitialized = true;
		return;
	
	/**
         * 开启数据校验任务
         */
	startVerifyTask();
	/**
         * 开启加载数据任务
         */
	startLoadTask();

开启加载全量数据的任务,提交了一个异步任务DistroLoadDataTask。
com.alibaba.nacos.core.distributed.distro.DistroProtocol#startLoadTask

private void startLoadTask() 
	DistroCallback loadCallback = new DistroCallback() 
		@Override
		public void onSuccess() 
			isInitialized = true;
		

		@Override
		public void onFailed(Throwable throwable) 
			isInitialized = false;
		
	;
	// 立即执行
	GlobalExecutor.submitLoadDataTask(
		new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));

下面来看DistroLoadDataTask的run()方法。run方法使用load方法加载从远程加载全量数据,如果检测到加载数据没有完成,则继续提交全量拉取数据的任务,否则进行任务的成功回调。如果加载数据发生了异常,则进行任务的失败回调。
com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#run

public void run() 
	try 
		// 加载数据
		load();
		if (!checkCompleted()) 
			// 加载不成功,延迟加载数据
			GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
		 else 
			loadCallback.onSuccess();
			Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
		
	 catch (Exception e) 
		loadCallback.onFailed(e);
		Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
	

com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#load

private void load() throws Exception 
	// 在服务启动的时候,是没有其他远程服务的地址的,如果服务地址都是空的,则进行等待,直到服务地址不为空。
	while (memberManager.allMembersWithoutSelf().isEmpty()) 
		Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
		TimeUnit.SECONDS.sleep(1);
	
	// 接着判断数据存储类型是否为空,如果为空,则进行等待,直到服务地址不为空。
	while (distroComponentHolder.getDataStorageTypes().isEmpty()) 
		Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
		TimeUnit.SECONDS.sleep(1);
	
	// 遍历所有的数据存储类型,判断loadCompletedMap是否存在数据存储类型和该类型的数据是否已经加载完成,
	// 如果没有则调用loadAllDataSnapshotFromRemote进行全量数据的加载:
	for (String each : distroComponentHolder.getDataStorageTypes()) 
		if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) 
			// 没完成的继续加载数据
			loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
		
	

loadAllDataSnapshotFromRemote()负责从远程服务器拉取数据。
com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask#loadAllDataSnapshotFromRemote

private boolean loadAllDataSnapshotFromRemote(String resourceType) 
	DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
	DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
	if (null == transportAgent || null == dataProcessor) 
		Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type , transportAgent: , dataProcessor: ",
							resourceType, transportAgent, dataProcessor);
		return false;
	
	// 遍历所有的远程服务地址,除了自己
	for (Member each : memberManager.allMembersWithoutSelf()) 
		try 
			Loggers.DISTRO.info("[DISTRO-INIT] load snapshot  from ", resourceType, each.getAddress());
			/**
                 * 从远程获取所有的数据
                 */
			DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
			// 处理数据
			boolean result = dataProcessor.processSnapshot(distroData);
			Loggers.DISTRO
				.info("[DISTRO-INIT] load snapshot  from  result: ", resourceType, each.getAddress(),
					  result);
			if (result) 
				return true;
			
		 catch (Exception e) 
			Loggers.DISTRO.error("[DISTRO-INIT] load snapshot  from  failed.", resourceType, each.getAddress(), e);
		
	
	return false;

loadAllDataSnapshotFromRemote方法做了两件事:

  1. 通过http请求拉取远程服务的所有全量数据:拉取数据的接口为:/distro/v1/ns/distro/datums
  2. 处理拉取回来的全量数据

处理全量数据的方法为processData():
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#processData(byte[])

private boolean processData(byte[] data) throws Exception 
	if (data.length > 0) 
		// 反序列化数据
		Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);

		for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) 
			dataStore.put(entry.getKey(), entry.getValue());

			if (!listeners.containsKey(entry.getKey())) 
				// pretty sure the service not exist:
				if (switchDomain.isDefaultInstanceEphemeral()) 
					// create empty service
					Loggers.DISTRO.info("creating service ", entry.getKey());
					Service service = new Service();
					String serviceName = KeyBuilder.getServiceName(entry.getKey());
					String namespaceId = KeyBuilder.getNamespace(entry.getKey());
					service.setName(serviceName);
					service.setNamespaceId(namespaceId);
					service.setGroupName(Constants.DEFAULT_GROUP);
					// now validate the service. if failed, exception will be thrown
					service.setLastModifiedMillis(System.currentTimeMillis());
					service.recalculateChecksum();

					// The Listener corresponding to the key value must not be empty
					RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
					if (Objects.isNull(listener)) 
						return false;
					
					listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
				
			
		

		for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) 

			if (!listeners.containsKey(entry.getKey())) 
				// Should not happen:
				Loggers.DISTRO.warn("listener of  not found.", entry.getKey());
				continue;
			

			try 
				for (RecordListener listener : listeners.get(entry.getKey())) 
					/**
                         * @see Service#onChange(java.lang.String, com.alibaba.nacos.naming.core.Instances)
                         */
					listener.onChange(entry.getKey(), entry.getValue().value);
				
			 catch (Exception e) 
				Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: ", entry.getKey(), e);
				continue;
			

			// Update data store if listener executed successfully:
			dataStore.put(entry.getKey(), entry.getValue());
		
	
	return true;

最后会调用到Service.onChange()方法,与实例的注册一样调用此方法更新注册表。

再来看看远程服务是如何处理全量拉取数据的请求的:
com.alibaba.nacos.naming.controllers.DistroController#getAllDatums

@GetMapping("/datums")
public ResponseEntity getAllDatums() 
	// 服务端查询所有实例数据的入口
	DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
	return ResponseEntity.ok(distroData.getContent());

com.alibaba.nacos.core.distributed.distro.DistroProtocol#onSnapshot

public DistroData onSnapshot(String type) 
	DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
	if (null == distroDataStorage) 
		Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key ", type);
		return new DistroData(new DistroKey("snapshot", type), new byte[0]);
	
	// 查询所有的实例数据
	return distroDataStorage.getDatumSnapshot();

com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroDataStorageImpl#getDatumSnapshot

public DistroData getDatumSnapshot() 
	// 从缓存中获取所有的实例数据
	Map<String, Datum> result = dataStore.getDataMap();
	byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
	DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
	return new DistroData(distroKey, dataContent);

全量数据拉取无非就是从内存dataStore中获取所有临时实例的数据,并且对数据进行序列化,然后返回给客户端。

数据校验任务

Nacos AP集群为保证数据的最终一致性会开启一个数据校验的定时任务来检查各个节点之间的数据是否一致,不一致就会进行数据的同步更新。

数据校验任务DistroVerifyTask与同步全量数据任务DistroLoadDataTask同样是在DistroProtocol实例化是创建的。

com.alibaba.nacos.core.distributed.distro.DistroProtocol#startDistroTask

private void startDistroTask() 
	if (EnvUtil.getStandaloneMode()) 
		isInitialized = true;
		return;
	
	/**
         * 开启数据校验任务
         */
	startVerifyTask();
	/**
         * 开启加载数据任务
         */
	startLoadTask();


private void startVerifyTask() 
	// 5s执行一次数据校验任务
	GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
												  distroConfig.getVerifyIntervalMillis());

数据校验任务默认5s执行一次,会将缓存中所有的key调用接口/nacos/v1/ns/distro/checksum发送给远程服务器。
com.alibaba.nacos.core.distributed.distro.task.verify.DistroVerifyTask#run

public void run() 
	try 
		// 获取集群中的其他节点
		List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
		if (Loggers.DISTRO.isDebugEnabled()) 
			Loggers.DISTRO.debug("server list is: ", targetServer);
		
		for (String each : distroComponentHolder.getDataStorageTypes()) 
			// 校验数据
			verifyForDataStorage(each, targetServer);
		
	 catch (Exception e) 
		Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
	


private void verifyForDataStorage(String type, List<Member> targetServer) 
	/**
         * @see com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroDataStorageImpl#getVerifyData()
         */
	DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
	if (null == distroData) 
		return;
	
	distroData.setType(DataOperation.VERIFY);
	// 遍历集群中的其他节点
	for (Member member : targetServer) 
		try 
			// 发送数据校验的请求
			distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
		 catch (Exception e) 
			Loggers.DISTRO.error(String
								 .format("[DISTRO-FAILED] verify data for type %s to %s failed.", type, member.getAddress()), e);
		
	

接下来看远程服务器端接受到数据校验任务的请求时是怎么处理的:

com.alibaba.nacos.naming.controllers.DistroController#syncChecksum

@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) 
	// 收到校验数据的请求
	DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), dataMap);
	// 开始校验
	distroProtocol.onVerify(distroHttpData);
	return ResponseEntity.ok("ok");

com.alibaba.nacos.core.distributed.distro.DistroProtocol#onVerify

public boolean onVerify(DistroData distroData) 
	String resourceType = distroData.getDistroKey().getResourceType();
	DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
	if (null == dataProcessor) 
		Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data ", resourceType);
		return false;
	
	/**
         * 处理校验数据
         */
	return dataProcessor.processVerifyData(distroData);

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#processVerifyData

public boolean processVerifyData(DistroData distroData) 
	DistroHttpData distroHttpData = (DistroHttpData) distroData;
	String sourceServer = distroData.getDistroKey().getResourceKey();
	Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
	// 校验数据
	onReceiveChecksums(verifyData, sourceServer);
	return true;

com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onReceiveChecksums

public void onReceiveChecksums(Map<String, String> checksumMap, String server) 

	if (syncChecksumTasks.containsKey(server)) 
		// Already in

以上是关于Nacos源码之服务端AP架构集群节点的心跳检测的主要内容,如果未能解决你的问题,请参考以下文章

nacos1.1.4源码第五天 ap的实现

nacos源码分析-心跳检测(服务端)

nacos源码分析-心跳检测(服务端)

nacos源码解析-注册中心服务注册处理

Nacos源码1.4.1注册中心服务端

Nacos源码分析专题-服务心跳

(c)2006-2024 SYSTEM All Rights Reserved IT常识