nacos 节点挂了,如何调用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nacos 节点挂了,如何调用相关的知识,希望对你有一定的参考价值。

参考技术A nacos ->依靠figen+ribbion 拦截器 服务发现调用NacosNamesServise.getAllInstances在第一次调用时候会缓存一份到本地

同时会开启延时1s定时任务更新

com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration#nacosDiscoveryClient

注册了NacosDiscoveryClient ,这是一个实现了DiscoveryClient服务发现的类

com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient#getInstances

获取所有服务,调用

com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean)

新版本NacosWatch不再监控,拓展点······

拓展知识点:
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle

事件监听类:

public abstract class AbstractAutoServiceRegistration<R extends Registration>

      implements AutoServiceRegistration, ApplicationContextAware,

      ApplicationListener<WebServerInitializedEvent>

ApplicationEventPublisherAware 事件发布

SmartLifecycle 是一个接口。当Spring容器加载所有bean并完成初始化之后,会接着回调实现该接口的类中对应的方法(start()方法)。

com.alibaba.cloud.nacos.discovery.NacosWatch#nacosServicesWatch

Nacos源码之服务端AP架构集群节点的心跳检测

当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。

当Nacos服务端启动时怎么知道集群中有哪些节点?

在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。

com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager

public ServerMemberManager(ServletContext servletContext) throws Exception 
	this.serverList = new ConcurrentSkipListMap<>();
	EnvUtil.setContextPath(servletContext.getContextPath());

	init();


protected void init() throws NacosException 
	Loggers.CORE.info("Nacos-related cluster resource initialization");
	this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
	this.localAddress = InetUtils.getSelfIP() + ":" + port;
	this.self = MemberUtil.singleParse(this.localAddress);
	this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
	serverList.put(self.getAddress(), self);

	// register NodeChangeEvent publisher to NotifyManager
	// 注册MembersChangeEvent事件
	registerClusterEvent();

	// Initializes the lookup mode
	// 初始化节点
	initAndStartLookup();

	if (serverList.isEmpty()) 
		throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
	

	Loggers.CORE.info("The cluster resource is initialized");

ServerMemberManager#registerClusterEvent

注册MembersChangeEvent的Publisher。

监听IPChangeEvent事件。

com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent

private void registerClusterEvent() 
	// Register node change events
	NotifyCenter.registerToPublisher(MembersChangeEvent.class,
									 EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));

	// The address information of this node needs to be dynamically modified
	// when registering the IP change of this node
	NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() 
		@Override
		public void onEvent(InetUtils.IPChangeEvent event) 
			String newAddress = event.getNewIP() + ":" + port;
			ServerMemberManager.this.localAddress = newAddress;
			EnvUtil.setLocalAddress(localAddress);

			Member self = ServerMemberManager.this.self;
			self.setIp(event.getNewIP());

			String oldAddress = event.getOldIP() + ":" + port;
			// 维护服务列表
			ServerMemberManager.this.serverList.remove(oldAddress);
			ServerMemberManager.this.serverList.put(newAddress, self);

			ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
			ServerMemberManager.this.memberAddressInfos.add(newAddress);
		

		@Override
		public Class<? extends Event> subscribeType() 
			return InetUtils.IPChangeEvent.class;
		
	);

ServerMemberManager#initAndStartLookup

com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup

private void initAndStartLookup() throws NacosException 
	this.lookup = LookupFactory.createLookUp(this);
	/**
     * @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start()
     */
	this.lookup.start();

FileConfigMemberLookup#start

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start

public void start() throws NacosException 
	if (start.compareAndSet(false, true)) 
		// 读取cluster.conf文件
		readClusterConfFromDisk();

		// Use the inotify mechanism to monitor file changes and automatically
		// trigger the reading of cluster.conf
		try 
			// 监听文件的变化
			WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
		 catch (Throwable e) 
			Loggers.CLUSTER.error("An exception occurred in the launch file monitor : ", e.getMessage());
		
	

FileConfigMemberLookup#readClusterConfFromDisk

com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk

private void readClusterConfFromDisk() 
	Collection<Member> tmpMembers = new ArrayList<>();
	try 
		List<String> tmp = EnvUtil.readClusterConf();
		// 读取cluster.conf文件
		tmpMembers = MemberUtil.readServerConf(tmp);
	 catch (Throwable e) 
		Loggers.CLUSTER
			.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : ", e.getMessage());
	

	/**
         * 发布MembersChangeEvent事件
         */
	afterLookup(tmpMembers);

AbstractMemberLookup#afterLookup

com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup

public void afterLookup(Collection<Member> members) 
	this.memberManager.memberChange(members);

ServerMemberManager#memberChange

com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange

synchronized boolean memberChange(Collection<Member> members) 

	if (members == null || members.isEmpty()) 
		return false;
	

	// 判断自己是否在集群中
	boolean isContainSelfIp = members.stream()
		.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));

	if (isContainSelfIp) 
		isInIpList = true;
	 else 
		isInIpList = false;
		// 如果自己不在集群中,把自己加入
		members.add(this.self);
		Loggers.CLUSTER.warn("[serverlist] self ip  not in serverlist ", self, members);
	

	// If the number of old and new clusters is different, the cluster information
	// must have changed; if the number of clusters is the same, then compare whether
	// there is a difference; if there is a difference, then the cluster node changes
	// are involved and all recipients need to be notified of the node change event

	// 判断集群的状态是否已变更
	boolean hasChange = members.size() != serverList.size();
	ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
	Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
	for (Member member : members) 
		final String address = member.getAddress();

		if (!serverList.containsKey(address)) 
			hasChange = true;
			// If the cluster information in cluster.conf or address-server has been changed,
			// while the corresponding nacos-server has not been started yet, the member's state
			// should be set to DOWN. If the corresponding nacos-server has been started, the
			// member's state will be set to UP after detection in a few seconds.
			member.setState(NodeState.DOWN);
		 else 
			//fix issue # 4925
			member.setState(serverList.get(address).getState());
		

		// Ensure that the node is created only once
		tmpMap.put(address, member);
		if (NodeState.UP.equals(member.getState())) 
			tmpAddressInfo.add(address);
		
	

	serverList = tmpMap;
	memberAddressInfos = tmpAddressInfo;

	Collection<Member> finalMembers = allMembers();

	Loggers.CLUSTER.warn("[serverlist] updated to : ", finalMembers);

	// Persist the current cluster node information to cluster.conf
	// <important> need to put the event publication into a synchronized block to ensure
	// that the event publication is sequential
	if (hasChange) 
		MemberUtil.syncToFile(finalMembers);
		// 发布MembersChangeEvent事件
		Event event = MembersChangeEvent.builder().members(finalMembers).build();
		NotifyCenter.publishEvent(event);
	

	return hasChange;

集群间的节点怎么维持心跳?

ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。

com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent

public void onApplicationEvent(WebServerInitializedEvent event) 
	getSelf().setState(NodeState.UP);
	if (!EnvUtil.getStandaloneMode()) 
		// 发送服务节点之间的心跳包
		GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
	
	EnvUtil.setPort(event.getWebServer().getPort());
	EnvUtil.setLocalAddress(this.localAddress);
	Loggers.CLUSTER.info("This node is ready to provide external services");

下面分析MemberInfoReportTask任务的执行过程。

Task#run

MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。

com.alibaba.nacos.core.cluster.Task#run

    public void run() 
        if (shutdown) 
            return;
        
        try 
            executeBody();
         catch (Throwable t) 
            Loggers.CORE.error("this task execute has error : ", ExceptionUtil.getStackTrace(t));
         finally 
            if (!shutdown) 
                after();
            
        
    

MemberInfoReportTask#executeBody

遍历集群中的所有的节点,给每个节点发送心跳包。

com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody

protected void executeBody() 
	// 获取除自己外的所有节点
	List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();

	if (members.isEmpty()) 
		return;
	

	// 每次+1
	this.cursor = (this.cursor + 1) % members.size();
	Member target = members.get(cursor);

	Loggers.CLUSTER.debug("report the metadata to the node : ", target.getAddress());

	// /nacos/v1/core/cluster/report
	final String url = HttpUtils
		.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
				  "/cluster/report");

	try 
		Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);
		AuthHeaderUtil.addIdentityToHeader(header);
		asyncRestTemplate
			.post(url, header,
				  Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() 
					  @Override
					  public void onReceive(RestResult<String> result) 
						  if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
							  || result.getCode() == HttpStatus.NOT_FOUND.value()) 
							  Loggers.CLUSTER
								  .warn(" version is too low, it is recommended to upgrade the version : ",
										target, VersionUtils.version);
							  return;
						  
						  if (result.ok()) 
							  // 成功
							  MemberUtil.onSuccess(ServerMemberManager.this, target);
						   else 
							  Loggers.CLUSTER
								  .warn("failed to report new info to target node : , result : ",
										target.getAddress(), result);
							  // 失败
							  MemberUtil.onFail(ServerMemberManager.this, target);
						  
					  

					  @Override
					  public void onError(Throwable throwable) 
						  Loggers.CLUSTER
							  .error("failed to report new info to target node : , error : ",
									 target.getAddress(),
									 ExceptionUtil.getAllExceptionMsg(throwable));
						  // 失败
						  MemberUtil.onFail(ServerMemberManager.this, target, throwable);
					  

					  @Override
					  public void onCancel() 

					  
				  );
	 catch (Throwable ex) 
		Loggers.CLUSTER.error("failed to report new info to target node : , error : ", target.getAddress(),
							  ExceptionUtil.getAllExceptionMsg(ex));
	

MemberUtil#onSuccess

com.alibaba.nacos.core.cluster.MemberUtil#onSuccess

public static void onSuccess(final ServerMemberManager manager, final Member member) 
	final NodeState old = member.getState();
	manager.getMemberAddressInfos().add(member.getAddress());
	// 将节点状态改为UP
	member.setState(NodeState.UP);
	member.setFailAccessCnt(0);
	if (!Objects.equals(old, member.getState())) 
		// 发布MembersChangeEvent事件
		manager.notifyMemberChange();
	

MemberUtil#onFail

com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)

public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) 
	manager.getMemberAddressInfos().remove(member.getAddress());
	final NodeState old = member.getState();
	// 将节点状态改为SUSPICIOUS
	member.setState(NodeState.SUSPICIOUS);
	member.以上是关于nacos 节点挂了,如何调用的主要内容,如果未能解决你的问题,请参考以下文章

Nacos 集群的工作原理

socketio集群节点挂了怎么办

Nacos源码之服务端AP架构集群节点的心跳检测

nacos 系列一(初识nacos)

Nacos源码之服务端AP架构集群节点数据的同步

sentinel怎么对集群中的单个节点降级