nacos源码分析-服务注册(服务端)

Posted 墨家巨子@俏如来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nacos源码分析-服务注册(服务端)相关的知识,希望对你有一定的参考价值。

安装Nacos源码

上一篇文章我们了解了《Nacos服务注册》客户端源码,本篇文章我们来看一下服务注册Nacos服务端的源码执行情况。首先需要下载Nacos源码, https://github.com/alibaba/nacos/releases/tag/1.4.3
解压之后使用IDEA工具导入即可。


但是编译过后发现代码会报错,主要是缺少实体类,比如:

安装protobuf

这主要是应该nacos数据通信底层使用到protobuf进行序列化(与JSON类似),是Google提供的一种数据序列化协议

Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据序列化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

所以这里我们需要安装protobuf ,先去下载 https://github.com/protocolbuffers/protobuf/releases,下载window版本如下:

  • 下载之后解压

  • 然后需要配置环境变量

  • 找到consistency模块,进入src/main

  • 进入main目录,执行cmd命令
protoc --java_out=./java ./proto/consistency.proto
protoc --java_out=./java ./proto/Data.proto

效果如下:

启动Nacos

找到console控制台,启动Nacos,第一次启动会报错,因为默认是以集群方式启动,会出现jdbc.properties找不到的错误

  • 然后指定为单机启动,指定VM参数

  • 启动成功

  • 访问 http://localhost:8848/nacos/index.html 进入控制台

  • 到这里,nacos服务端的源码就启动成功了,那么我们尝试启动nacos-client程序,让他注册到nacos-server

  • 查看控制台,nacos-client成功注册到服务端

服务注册

在上一章节《Nacos源码分析-服务注册(客户端)》我们有分析到,nacos-client提交注册的地址是post /nacos/v1/ns/instance,那么我们在nacos-server源码中找到该接口,它位于 naming 模块中的/controllers包下的InstanceController接口中。源码如下

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController 
    
    @Autowired
    private SwitchDomain switchDomain;
    
    @Autowired
    private PushService pushService;
    
    @Autowired
    private ServiceManager serviceManager;
    
  	...省略...
    
    /**
      注册一个新的实例
     * Register new instance.
     *
     * @param request http request
     * @return 'ok' if success
     * @throws Exception any error during register
     */
	@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    //request请求对象中包括了注册的服务的port,namespaceId,groupName,serviceName,ip,集群名等等
    public String register(HttpServletRequest request) throws Exception 
        //拿到注册的服务的:namespaceId,默认是public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //拿到注册的服务的:serviceName服务名会把组名加在前面,比如:DEFAULT_GROUP@@nacos-client
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //检查服务名的格式:groupName@@serviceName
        NamingUtils.checkServiceNameFormat(serviceName);
        //解析请求参数,封装服务实例对戏,把注册的服务封装为Instance,其中包括IP,端口,服务名等
        final Instance instance = parseInstance(request);
        //使用ServiceManger注册服务实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    

	//解析要注册的服务实例
	private Instance parseInstance(HttpServletRequest request) throws Exception 
        //拿到服务名 DEFAULT_GROUP@@nacos-client
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        //拿到app,没配置就是:unknown
        String app = WebUtils.optional(request, "app", "DEFAULT");
        //拿到注册服务的:IP,是否开启服务,权重,健康状况,等封装为Instance 对象
        Instance instance = getIpAddress(request);
        instance.setApp(app);
        instance.setServiceName(serviceName);
        // Generate simple instance id first. This value would be updated according to
        // 生成实例的ID:192.168.174.1#8080#DEFAULT#DEFAULT_GROUP@@nacos-client
        instance.setInstanceId(instance.generateInstanceId());
        //设置最后的心跳时间为当前时间
        instance.setLastBeat(System.currentTimeMillis());
        String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
        if (StringUtils.isNotEmpty(metadata)) 
            instance.setMetadata(UtilsAndCommons.parseMetadata(metadata));
        
        //验证实例
        instance.validate();
        
        return instance;
    

register方法中会从请求对象中拿到注册的参数比如IP,是否开启服务,权重,健康状况等,然后封装为 instance对象,交给 serviceManager.registerInstance 去注册,下面是 serviceManager.registerInstance的源码

缓存和初始化serivce

@Component
public class ServiceManager implements RecordListener<Service> 
    
    /**
     * Map(namespace, Map(group::serviceName, Service)).
     */
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  
   ...省略部分代码...
	//注册服务实例
	public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException 
	        //1.会尝试从serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,
	        // 并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager的一个ConcurrentHashMap中
	        // 服务注册表的结构是Map<String,Map<String,Service>>
	        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
	        //从注册表中获取服务,注册表是一个Map<String,Map<String,Service>>结构,
	        // 先根据namespaceId取得到Map<String,Service>,然后再根据serviceName取Service
	        Service service = getService(namespaceId, serviceName);
	        //参数无效,没有找到服务
	        if (service == null) 
	            throw new NacosException(NacosException.INVALID_PARAM,
	                    "service not found, namespace: " + namespaceId + ", service: " + serviceName);
	        
	        //添加 instance 服务实例到注册表
	        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
	    
	    
	    ...省略部分代码...
	    
		//二.创建service,并初始化
		public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
		            throws NacosException 
	        Service service = getService(namespaceId, serviceName);
	        //如果服务不存在就创建一个service
	        if (service == null) 
	            
	            Loggers.SRV_LOG.info("creating empty service :", namespaceId, serviceName);
	            service = new Service();
	            service.setName(serviceName);
	            service.setNamespaceId(namespaceId);
	            service.setGroupName(NamingUtils.getGroupName(serviceName));
	            // now validate the service. if failed, exception will be thrown
	            service.setLastModifiedMillis(System.currentTimeMillis());
	            service.recalculateChecksum();
	            if (cluster != null) 
	                cluster.setService(service);
	                service.getClusterMap().put(cluster.getName(), cluster);
	            
	            service.validate();
	            //保存service和初始化service
	            putServiceAndInit(service);
	            if (!local) 
	                addOrReplaceService(service);
	            
	        
	    
		//保存service和初始化service
		private void putServiceAndInit(Service service) throws NacosException 
				//保存service
		        putService(service);
		        service = getService(service.getNamespaceId(), service.getName());
		        //初始化service
		        service.init();
		        //consistencyService.listen实现数据一致性监听
		        consistencyService
		                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
		        consistencyService
		                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
		        Loggers.SRV_LOG.info("[NEW-SERVICE] ", service.toJson());
		    
		
	  	//保存service到注册表中
	  	public void putService(Service service) 
	        if (!serviceMap.containsKey(service.getNamespaceId())) 
	            synchronized (putServiceLock) 
	                if (!serviceMap.containsKey(service.getNamespaceId())) 
	                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
	                
	            
	        
	        //把注册的服务存储到Map中
	        serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
	    

registerInstance做了三个事情

  • 通过putService()方法将服务缓存到内存

  • service.init()建立心跳机制

  • consistencyService.listen实现数据一致性监听

registerInstance方法会尝试从ServiceManager#serviceMap(服务注册表)中获取到服务实例,如果没有就会创建一个Service,并设置好属性:GroupName,namespaceId,serviceName。然后存储到ServiceManager#serviceMap中。

该Map是一个ConcurrentHashMap,结构是Map<String,Map<String,Service>>。第一个Key是NamespaceId 如:public ,第二个key是服务名,如 : DEFAULT_GROUP@@nacos-client

这就是nacos中的的服务注册表,用来存放注册的服务实例的Map.


注意:service和instance的关系是,一个service中包含一个 Map<String, Cluster> , 一个Cluster中包含一个 Set。

  • service代表一个服务:比如用户服务
  • Cluster代表服务集群,比如2个用户服务形成一个集群
  • 而一个集群中有多个服务实例,所以Cluster中有了Set 来保存服务实例

除此之外还会调用 com.alibaba.nacos.naming.core.Service#init 方法对service进行初始化,下面是init方法的源码

public void init() 
		//clientBeatCheckTask 是一个Runnable,它持有service,它的作用是
       //检查并更新临时实例的状态,如果它们已过期,则将其删除
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) 
            entry.getValue().setService(this);
            entry.getValue().init();
        
    
//定时任务:定时检查服务的健康状况,5S一次
 public static void scheduleCheck(ClientBeatCheckTask task) 
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    

service.init 初始化方法中主要是把service封装到 ClientBeatCheckTask 对象中,ClientBeatCheckTask 是一个Runnable线程对象,然后使用定时任务5s执行一次健康检查。 ClientBeatCheckTask 的作用是 : 检查并更新临时实例的状态,如果它们已过期,则将其删除

下面是 com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask#run 线程对象的源码

public void run() 
        try 
            if (!getDistroMapper().responsible(service.getName())) 
                return;
            
            
            if (!getSwitchDomain().isHealthCheckEnabled()) 
                return;
            
            //拿到服务中的所有实例
            List<Instance> instances = service.allIPs(true);
            
            // first set health status of instances:
            for (Instance instance : instances) 
            	//当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) 
                    if (!instance.isMarked()) 
                        if (instance.isHealthy()) 
                        	//健康状态设置为false
                            instance.setHealthy(false);
                            Loggers.EVT_LOG
                                    .info("POS IP-DISABLED valid: :@@, region: , msg: client timeout after , last beat: ",
                                            instance.getIp(), instance.getPort(), instance.getClusterName(),
                                            service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                                            instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                             //发布时间:服务状态改变
                            getPushService().serviceChanged(service);
                            //发布时间:服务实例心跳超时事件
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        
                    
                
            
            
            if (!getGlobalConfig().isExpireInstance()) 
                return;
            
            
            // then remove obsolete instances:
            for (Instance instance : instances) 
                
                if (instance.isMarked()) 
                    continue;
                
                
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) 
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: , ip: ", service.getName(),
                            JacksonUtils.toJson(instance));
                    deleteIp(instance);
                
            
            
         catch (Exception e) 
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        
        
    

run方法中会拿到当前service的所有instance,然后循环 , 如果:当前系统时间 - 实例最后心跳时间 > 默认15s,就意味着超时,然后会改变instance的Healthy健康状态Wie false; 并抛出 服务实例心跳超时事件

getPushService().serviceChanged(service):方法很有意思,他的作用是通知 nacos-client该服务已经下线(UDP协议 push),这样的话nacos-client就会从本地剔除掉下线的服务。这就是它和eureka不一样的地方,eureka使用的是pull.而 nacos采用pull + push模式。 具体源码见: PushService#onApplicationEvent

 public void onApplicationEvent(ServiceChangeEvent event) 
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        //使用定时任务 1s 一次
        Future future = GlobalExecutor.scheduleUdpSender(() -> 
            try 
            	//服务改变,添加到 push队列
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) 
                    return;
                
                
                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) 
                    if (client.zombie()) 
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    
                    
                    

以上是关于nacos源码分析-服务注册(服务端)的主要内容,如果未能解决你的问题,请参考以下文章

nacos源码分析-心跳检测(服务端)

nacos源码分析-心跳检测(服务端)

Nacos3# 服务注册与发现服务端启动源码解析

Nacos4# 服务端响应连接和注册源码分析

Nacos源码系列—服务端那些事儿

Nacos源码1.4.1注册中心服务端