Nacos源码分析专题-服务发现

Posted IT-老牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析专题-服务发现相关的知识,希望对你有一定的参考价值。

1.客户端

1.1.定时更新服务列表

1.1.1.NacosNamingService

在前面我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。

多个重载的方法最终都会进入一个方法:

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
                                      boolean subscribe) throws NacosException 

    ServiceInfo serviceInfo;
    // 1.判断是否需要订阅服务信息(默认为 true)
    if (subscribe) 
        // 1.1.订阅服务信息
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                                                 StringUtils.join(clusters, ","));
     else 
        // 1.2.直接去nacos拉取服务信息
        serviceInfo = hostReactor
            .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                                              StringUtils.join(clusters, ","));
    
    // 2.从服务信息中获取实例列表并返回
    List<Instance> list;
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) 
        return new ArrayList<Instance>();
    
    return list;


1.1.2.HostReactor

进入订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) 

    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    // 由 服务名@@集群名拼接 key
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) 
        return failoverReactor.getService(key);
    
    // 读取本地服务列表的缓存,缓存是一个Map,格式:Map<String, ServiceInfo>
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    // 判断缓存是否存在
    if (null == serviceObj) 
        // 不存在,创建空ServiceInfo
        serviceObj = new ServiceInfo(serviceName, clusters);
        // 放入缓存
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        // 放入待更新的服务列表(updatingMap)中
        updatingMap.put(serviceName, new Object());
        // 立即更新服务列表
        updateServiceNow(serviceName, clusters);
        // 从待更新列表中移除
        updatingMap.remove(serviceName);

     else if (updatingMap.containsKey(serviceName)) 
        // 缓存中有,但是需要更新
        if (UPDATE_HOLD_INTERVAL > 0) 
            // hold a moment waiting for update finish 等待5秒中,待更新完成
            synchronized (serviceObj) 
                try 
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                 catch (InterruptedException e) 
                    NAMING_LOGGER
                        .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                
            
        
    
    // 开启定时更新服务列表的功能
    scheduleUpdateIfAbsent(serviceName, clusters);
    // 返回缓存中的服务信息
    return serviceInfoMap.get(serviceObj.getKey());


基本逻辑就是先从本地缓存读,根据结果来选择:

  • 如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)
  • 如果本地缓存有,则开启定时更新功能,并返回缓存结果:
    • scheduleUpdateIfAbsent(serviceName, clusters)

      UpdateTask中,最终还是调用updateService方法:

      不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:
public void updateService(String serviceName, String clusters) throws NacosException 
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try 
		// 基于ServerProxy发起远程调用,查询服务列表
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);

        if (StringUtils.isNotEmpty(result)) 
            // 处理查询结果
            processServiceJson(result);
        
     finally 
        if (oldService != null) 
            synchronized (oldService) 
                oldService.notifyAll();
            
        
    


1.1.3.ServerProxy

ServerProxyqueryList方法如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
    throws NacosException 
	// 准备请求参数
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
	// 发起请求,地址与API接口一致
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);


1.2.处理服务变更通知

除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。

HostReactor类的构造函数中,有非常重要的几个步骤:

基本思路是:

  • 通过PushReceiver监听服务端推送的变更数据
  • 解析数据后,通过NotifyCenter发布服务变更的事件
  • InstanceChangeNotifier监听变更事件,完成对服务列表的更新

1.2.1.PushReceiver

我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。

先看构造函数:

public PushReceiver(HostReactor hostReactor) 
    try 
        this.hostReactor = hostReactor;
        // 创建 UDP客户端
        String udpPort = getPushReceiverUdpPort();
        if (StringUtils.isEmpty(udpPort)) 
            this.udpSocket = new DatagramSocket();
         else 
            this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
        
        // 准备线程池
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() 
            @Override
            public Thread newThread(Runnable r) 
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.push.receiver");
                return thread;
            
        );
		// 开启线程任务,准备接收变更数据
        this.executorService.execute(this);
     catch (Exception e) 
        NAMING_LOGGER.error("[NA] init udp socket failed", e);
    


PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:

@Override
public void run() 
    while (!closed) 
        try 
            // byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
			// 接收推送数据
            udpSocket.receive(packet);
			// 解析为json字符串
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
			// 反序列化为对象
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) 
                // 交给 HostReactor去处理
                hostReactor.processServiceJson(pushPacket.data);

                // send ack to server 发送ACK回执,略。。
         catch (Exception e) 
            if (closed) 
                return;
            
            NAMING_LOGGER.error("[NA] error while receiving push data", e);
        
    


1.2.2.HostReactor

通知数据的处理由交给了HostReactorprocessServiceJson方法:

public ServiceInfo processServiceJson(String json) 
    // 解析出ServiceInfo信息
    ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
    String serviceKey = serviceInfo.getKey();
    if (serviceKey == null) 
        return null;
    
    // 查询缓存中的 ServiceInfo
    ServiceInfo oldService = serviceInfoMap.get(serviceKey);

    // 如果缓存存在,则需要校验哪些数据要更新
    boolean changed = false;
    if (oldService != null) 
		// 拉取的数据是否已经过期
        if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) 
            NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                               + serviceInfo.getLastRefTime());
        
        // 放入缓存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
		
        // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
        // modHosts:需要修改的实例
        if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) 
            // 发布实例变更的事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
            DiskCache.write(serviceInfo, cacheDir);
        

     else 
        // 本地缓存不存在
        changed = true;
        // 放入缓存
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        // 直接发布实例变更的事件
        NotifyCenter.publishEvent(new InstancesChangeEvent(
            serviceInfo.getName(), serviceInfo.getGroupName(),
            serviceInfo.getClusters(), serviceInfo.getHosts()));
        serviceInfo.setJsonFromServer(json);
        DiskCache.write(serviceInfo, cacheDir);
    
	// 。。。
    return serviceInfo;


2.服务端

2.1.拉取服务列表接口

在介绍的InstanceController中,提供了拉取服务列表的接口:

/**
     * Get all instance of input service.
     *
     * @param request http request
     * @return list of instance
     * @throws Exception any error during list
     */
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception 
    // 从request中获取namespaceId和serviceName
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    // 获取客户端的 UDP端口
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);

    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);

    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

    // 获取服务列表
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                     healthyOnly);


进入doSrvIpxt()方法来获取服务列表:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
                            String clusters, String clientIP,
                            int udpPort, String env, boolean isCheck,
                            String app, String tid, boolean healthyOnly) throws Exception 
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 获取服务列表信息
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();

    // now try to enable the push
    try 
        if (udpPort > 0 && pushService.canEnablePush(agent)) 
			// 添加当前客户端 IP、UDP端口到 PushService 中
            pushService
                .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                           pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        
     catch (Exception e) 
        Loggers.SRV_LOG
            .error("[NACOS-API] failed to added push client , :", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    

    if (service == null) 
        // 如果没找到,返回空
        if (LoggersNacos源码分析专题-服务发现

Nacos源码分析专题-服务注册

Nacos源码分析专题-服务注册

Nacos源码分析专题-服务注册

Nacos源码分析专题-服务心跳

Nacos源码分析专题-服务心跳