nacos 节点挂了,如何调用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nacos 节点挂了,如何调用相关的知识,希望对你有一定的参考价值。
参考技术A nacos ->依靠figen+ribbion 拦截器 服务发现调用NacosNamesServise.getAllInstances在第一次调用时候会缓存一份到本地同时会开启延时1s定时任务更新
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration#nacosDiscoveryClient
注册了NacosDiscoveryClient ,这是一个实现了DiscoveryClient服务发现的类
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClient#getInstances
获取所有服务,调用
com.alibaba.nacos.client.naming.NacosNamingService#selectInstances(java.lang.String, java.lang.String, boolean)
新版本NacosWatch不再监控,拓展点······
拓展知识点:
public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle
事件监听类:
public abstract class AbstractAutoServiceRegistration<R extends Registration>
implements AutoServiceRegistration, ApplicationContextAware,
ApplicationListener<WebServerInitializedEvent>
ApplicationEventPublisherAware 事件发布
SmartLifecycle 是一个接口。当Spring容器加载所有bean并完成初始化之后,会接着回调实现该接口的类中对应的方法(start()方法)。
com.alibaba.cloud.nacos.discovery.NacosWatch#nacosServicesWatch
Nacos源码之服务端AP架构集群节点的心跳检测
当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。
当Nacos服务端启动时怎么知道集群中有哪些节点?
在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。
com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager
public ServerMemberManager(ServletContext servletContext) throws Exception
this.serverList = new ConcurrentSkipListMap<>();
EnvUtil.setContextPath(servletContext.getContextPath());
init();
protected void init() throws NacosException
Loggers.CORE.info("Nacos-related cluster resource initialization");
this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
this.localAddress = InetUtils.getSelfIP() + ":" + port;
this.self = MemberUtil.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
serverList.put(self.getAddress(), self);
// register NodeChangeEvent publisher to NotifyManager
// 注册MembersChangeEvent事件
registerClusterEvent();
// Initializes the lookup mode
// 初始化节点
initAndStartLookup();
if (serverList.isEmpty())
throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");
Loggers.CORE.info("The cluster resource is initialized");
ServerMemberManager#registerClusterEvent
注册MembersChangeEvent的Publisher。
监听IPChangeEvent事件。
com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent
private void registerClusterEvent()
// Register node change events
NotifyCenter.registerToPublisher(MembersChangeEvent.class,
EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));
// The address information of this node needs to be dynamically modified
// when registering the IP change of this node
NotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>()
@Override
public void onEvent(InetUtils.IPChangeEvent event)
String newAddress = event.getNewIP() + ":" + port;
ServerMemberManager.this.localAddress = newAddress;
EnvUtil.setLocalAddress(localAddress);
Member self = ServerMemberManager.this.self;
self.setIp(event.getNewIP());
String oldAddress = event.getOldIP() + ":" + port;
// 维护服务列表
ServerMemberManager.this.serverList.remove(oldAddress);
ServerMemberManager.this.serverList.put(newAddress, self);
ServerMemberManager.this.memberAddressInfos.remove(oldAddress);
ServerMemberManager.this.memberAddressInfos.add(newAddress);
@Override
public Class<? extends Event> subscribeType()
return InetUtils.IPChangeEvent.class;
);
ServerMemberManager#initAndStartLookup
com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup
private void initAndStartLookup() throws NacosException
this.lookup = LookupFactory.createLookUp(this);
/**
* @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start()
*/
this.lookup.start();
FileConfigMemberLookup#start
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start
public void start() throws NacosException
if (start.compareAndSet(false, true))
// 读取cluster.conf文件
readClusterConfFromDisk();
// Use the inotify mechanism to monitor file changes and automatically
// trigger the reading of cluster.conf
try
// 监听文件的变化
WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
catch (Throwable e)
Loggers.CLUSTER.error("An exception occurred in the launch file monitor : ", e.getMessage());
FileConfigMemberLookup#readClusterConfFromDisk
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk
private void readClusterConfFromDisk()
Collection<Member> tmpMembers = new ArrayList<>();
try
List<String> tmp = EnvUtil.readClusterConf();
// 读取cluster.conf文件
tmpMembers = MemberUtil.readServerConf(tmp);
catch (Throwable e)
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : ", e.getMessage());
/**
* 发布MembersChangeEvent事件
*/
afterLookup(tmpMembers);
AbstractMemberLookup#afterLookup
com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup
public void afterLookup(Collection<Member> members)
this.memberManager.memberChange(members);
ServerMemberManager#memberChange
com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange
synchronized boolean memberChange(Collection<Member> members)
if (members == null || members.isEmpty())
return false;
// 判断自己是否在集群中
boolean isContainSelfIp = members.stream()
.anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
if (isContainSelfIp)
isInIpList = true;
else
isInIpList = false;
// 如果自己不在集群中,把自己加入
members.add(this.self);
Loggers.CLUSTER.warn("[serverlist] self ip not in serverlist ", self, members);
// If the number of old and new clusters is different, the cluster information
// must have changed; if the number of clusters is the same, then compare whether
// there is a difference; if there is a difference, then the cluster node changes
// are involved and all recipients need to be notified of the node change event
// 判断集群的状态是否已变更
boolean hasChange = members.size() != serverList.size();
ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
for (Member member : members)
final String address = member.getAddress();
if (!serverList.containsKey(address))
hasChange = true;
// If the cluster information in cluster.conf or address-server has been changed,
// while the corresponding nacos-server has not been started yet, the member's state
// should be set to DOWN. If the corresponding nacos-server has been started, the
// member's state will be set to UP after detection in a few seconds.
member.setState(NodeState.DOWN);
else
//fix issue # 4925
member.setState(serverList.get(address).getState());
// Ensure that the node is created only once
tmpMap.put(address, member);
if (NodeState.UP.equals(member.getState()))
tmpAddressInfo.add(address);
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
Loggers.CLUSTER.warn("[serverlist] updated to : ", finalMembers);
// Persist the current cluster node information to cluster.conf
// <important> need to put the event publication into a synchronized block to ensure
// that the event publication is sequential
if (hasChange)
MemberUtil.syncToFile(finalMembers);
// 发布MembersChangeEvent事件
Event event = MembersChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
return hasChange;
集群间的节点怎么维持心跳?
ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。
com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event)
getSelf().setState(NodeState.UP);
if (!EnvUtil.getStandaloneMode())
// 发送服务节点之间的心跳包
GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
EnvUtil.setPort(event.getWebServer().getPort());
EnvUtil.setLocalAddress(this.localAddress);
Loggers.CLUSTER.info("This node is ready to provide external services");
下面分析MemberInfoReportTask任务的执行过程。
Task#run
MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。
com.alibaba.nacos.core.cluster.Task#run
public void run()
if (shutdown)
return;
try
executeBody();
catch (Throwable t)
Loggers.CORE.error("this task execute has error : ", ExceptionUtil.getStackTrace(t));
finally
if (!shutdown)
after();
MemberInfoReportTask#executeBody
遍历集群中的所有的节点,给每个节点发送心跳包。
com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody
protected void executeBody()
// 获取除自己外的所有节点
List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
if (members.isEmpty())
return;
// 每次+1
this.cursor = (this.cursor + 1) % members.size();
Member target = members.get(cursor);
Loggers.CLUSTER.debug("report the metadata to the node : ", target.getAddress());
// /nacos/v1/core/cluster/report
final String url = HttpUtils
.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,
"/cluster/report");
try
Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);
AuthHeaderUtil.addIdentityToHeader(header);
asyncRestTemplate
.post(url, header,
Query.EMPTY, getSelf(), reference.getType(), new Callback<String>()
@Override
public void onReceive(RestResult<String> result)
if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()
|| result.getCode() == HttpStatus.NOT_FOUND.value())
Loggers.CLUSTER
.warn(" version is too low, it is recommended to upgrade the version : ",
target, VersionUtils.version);
return;
if (result.ok())
// 成功
MemberUtil.onSuccess(ServerMemberManager.this, target);
else
Loggers.CLUSTER
.warn("failed to report new info to target node : , result : ",
target.getAddress(), result);
// 失败
MemberUtil.onFail(ServerMemberManager.this, target);
@Override
public void onError(Throwable throwable)
Loggers.CLUSTER
.error("failed to report new info to target node : , error : ",
target.getAddress(),
ExceptionUtil.getAllExceptionMsg(throwable));
// 失败
MemberUtil.onFail(ServerMemberManager.this, target, throwable);
@Override
public void onCancel()
);
catch (Throwable ex)
Loggers.CLUSTER.error("failed to report new info to target node : , error : ", target.getAddress(),
ExceptionUtil.getAllExceptionMsg(ex));
MemberUtil#onSuccess
com.alibaba.nacos.core.cluster.MemberUtil#onSuccess
public static void onSuccess(final ServerMemberManager manager, final Member member)
final NodeState old = member.getState();
manager.getMemberAddressInfos().add(member.getAddress());
// 将节点状态改为UP
member.setState(NodeState.UP);
member.setFailAccessCnt(0);
if (!Objects.equals(old, member.getState()))
// 发布MembersChangeEvent事件
manager.notifyMemberChange();
MemberUtil#onFail
com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex)
manager.getMemberAddressInfos().remove(member.getAddress());
final NodeState old = member.getState();
// 将节点状态改为SUSPICIOUS
member.setState(NodeState.SUSPICIOUS);
member.以上是关于nacos 节点挂了,如何调用的主要内容,如果未能解决你的问题,请参考以下文章