Nacos源码分析专题-服务发现
Posted IT-老牛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析专题-服务发现相关的知识,希望对你有一定的参考价值。
1.客户端
1.1.定时更新服务列表
1.1.1.NacosNamingService
在前面我们讲到一个类NacosNamingService
,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。
多个重载的方法最终都会进入一个方法:
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
boolean subscribe) throws NacosException
ServiceInfo serviceInfo;
// 1.判断是否需要订阅服务信息(默认为 true)
if (subscribe)
// 1.1.订阅服务信息
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
else
// 1.2.直接去nacos拉取服务信息
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
// 2.从服务信息中获取实例列表并返回
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts()))
return new ArrayList<Instance>();
return list;
1.1.2.HostReactor
进入订阅服务消息,这里是由HostReactor
类的getServiceInfo()
方法来实现的:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters)
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
// 由 服务名@@集群名拼接 key
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch())
return failoverReactor.getService(key);
// 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
// 判断缓存是否存在
if (null == serviceObj)
// 不存在,创建空ServiceInfo
serviceObj = new ServiceInfo(serviceName, clusters);
// 放入缓存
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 放入待更新的服务列表(updatingMap)中
updatingMap.put(serviceName, new Object());
// 立即更新服务列表
updateServiceNow(serviceName, clusters);
// 从待更新列表中移除
updatingMap.remove(serviceName);
else if (updatingMap.containsKey(serviceName))
// 缓存中有,但是需要更新
if (UPDATE_HOLD_INTERVAL > 0)
// hold a moment waiting for update finish 等待5秒中,待更新完成
synchronized (serviceObj)
try
serviceObj.wait(UPDATE_HOLD_INTERVAL);
catch (InterruptedException e)
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
// 开启定时更新服务列表的功能
scheduleUpdateIfAbsent(serviceName, clusters);
// 返回缓存中的服务信息
return serviceInfoMap.get(serviceObj.getKey());
基本逻辑就是先从本地缓存读,根据结果来选择:
- 如果本地缓存没有,立即去
nacos
读取,updateServiceNow(serviceName, clusters)
- 如果本地缓存有,则开启定时更新功能,并返回缓存结果:
scheduleUpdateIfAbsent(serviceName, clusters)
在UpdateTask
中,最终还是调用updateService
方法:
不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor
中的updateService()
方法:
public void updateService(String serviceName, String clusters) throws NacosException
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try
// 基于ServerProxy发起远程调用,查询服务列表
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
if (StringUtils.isNotEmpty(result))
// 处理查询结果
processServiceJson(result);
finally
if (oldService != null)
synchronized (oldService)
oldService.notifyAll();
1.1.3.ServerProxy
而ServerProxy
的queryList
方法如下:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException
// 准备请求参数
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
// 发起请求,地址与API接口一致
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
1.2.处理服务变更通知
除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。
在HostReactor
类的构造函数中,有非常重要的几个步骤:
基本思路是:
- 通过
PushReceiver
监听服务端推送的变更数据 - 解析数据后,通过
NotifyCenter
发布服务变更的事件 InstanceChangeNotifier
监听变更事件,完成对服务列表的更新
1.2.1.PushReceiver
我们先看PushReceiver
,这个类会以UDP
方式接收Nacos
服务端推送的服务变更数据。
先看构造函数:
public PushReceiver(HostReactor hostReactor)
try
this.hostReactor = hostReactor;
// 创建 UDP客户端
String udpPort = getPushReceiverUdpPort();
if (StringUtils.isEmpty(udpPort))
this.udpSocket = new DatagramSocket();
else
this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
// 准备线程池
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory()
@Override
public Thread newThread(Runnable r)
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
);
// 开启线程任务,准备接收变更数据
this.executorService.execute(this);
catch (Exception e)
NAMING_LOGGER.error("[NA] init udp socket failed", e);
PushReceiver
构造函数中基于线程池来运行任务。这是因为PushReceiver
本身也是一个Runnable
,其中的run方法业务逻辑如下:
@Override
public void run()
while (!closed)
try
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
// 接收推送数据
udpSocket.receive(packet);
// 解析为json字符串
String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
// 反序列化为对象
PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type))
// 交给 HostReactor去处理
hostReactor.processServiceJson(pushPacket.data);
// send ack to server 发送ACK回执,略。。
catch (Exception e)
if (closed)
return;
NAMING_LOGGER.error("[NA] error while receiving push data", e);
1.2.2.HostReactor
通知数据的处理由交给了HostReactor
的processServiceJson
方法:
public ServiceInfo processServiceJson(String json)
// 解析出ServiceInfo信息
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
String serviceKey = serviceInfo.getKey();
if (serviceKey == null)
return null;
// 查询缓存中的 ServiceInfo
ServiceInfo oldService = serviceInfoMap.get(serviceKey);
// 如果缓存存在,则需要校验哪些数据要更新
boolean changed = false;
if (oldService != null)
// 拉取的数据是否已经过期
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime())
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
+ serviceInfo.getLastRefTime());
// 放入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
// modHosts:需要修改的实例
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0)
// 发布实例变更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
DiskCache.write(serviceInfo, cacheDir);
else
// 本地缓存不存在
changed = true;
// 放入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 直接发布实例变更的事件
NotifyCenter.publishEvent(new InstancesChangeEvent(
serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
// 。。。
return serviceInfo;
2.服务端
2.1.拉取服务列表接口
在介绍的InstanceController
中,提供了拉取服务列表的接口:
/**
* Get all instance of input service.
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception
// 从request中获取namespaceId和serviceName
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
// 获取客户端的 UDP端口
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
// 获取服务列表
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
进入doSrvIpxt()
方法来获取服务列表:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
String clusters, String clientIP,
int udpPort, String env, boolean isCheck,
String app, String tid, boolean healthyOnly) throws Exception
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 获取服务列表信息
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try
if (udpPort > 0 && pushService.canEnablePush(agent))
// 添加当前客户端 IP、UDP端口到 PushService 中
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
catch (Exception e)
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client , :", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
if (service == null)
// 如果没找到,返回空
if (LoggersNacos源码分析专题-服务发现