Nacos源码分析专题-服务心跳

Posted IT-老牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码分析专题-服务心跳相关的知识,希望对你有一定的参考价值。

文章目录

1. 引言

Nacos的实例分为临时实例和永久实例两种,可以通过在yaml 文件配置:

spring:
  application:
    name: order-service
  cloud:
    nacos:
      discovery:
        ephemeral: false # 设置实例为永久实例。true:临时; false:永久
      server-addr: 192.168.150.1:8845

临时实例基于心跳方式做健康检测,而永久实例则是由Nacos主动探测实例状态。

其中Nacos提供的心跳的API接口为:

接口描述:发送某个实例的心跳
请求类型:PUT
请求路径

/nacos/v1/ns/instance/beat

请求参数:

名称类型是否必选描述
serviceName字符串服务名
groupName字符串分组名
ephemeralboolean是否临时实例
beat字符串JSON格式字符串

错误编码:

错误代码描述语义
400Bad Request客户端请求中的语法错误
403Forbidden没有权限
404Not Found无法找到资源
500Internal Server Error服务器内部错误
200OK正常

2. 客户端

在服务注册这一节中,我们说过NacosNamingService这个类实现了服务的注册,同时也实现了服务心跳:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException 
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否是临时实例。
    if (instance.isEphemeral()) 
        // 如果是临时实例,则构建心跳信息BeatInfo
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        // 添加心跳任务
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    
    serverProxy.registerService(groupedServiceName, groupName, instance);

2.1 BeatInfo

这里的BeanInfo就包含心跳需要的各种信息:

2.2. BeatReactor

BeatReactor这个类则维护了一个线程池:

当调用BeatReactor的.addBeatInfo(groupedServiceName, beatInfo)方法时,就会执行心跳:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) 
    NAMING_LOGGER.info("[BEAT] adding beat:  to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) 
        existBeat.setStopped(true);
    
    dom2Beat.put(key, beatInfo);
    // 利用线程池,定期执行心跳任务,周期为 beatInfo.getPeriod()
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());


心跳周期的默认值在com.alibaba.nacos.api.common.Constants类中:

可以看到是5秒,默认5秒一次心跳。

2.3. BeatTask

心跳的任务封装在BeatTask这个类中,是一个Runnable,其run方法如下:

@Override
public void run() 
    if (beatInfo.isStopped()) 
        return;
    
    // 获取心跳周期
    long nextTime = beatInfo.getPeriod();
    try 
        // 发送心跳
        JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
        long interval = result.get("clientBeatInterval").asLong();
        boolean lightBeatEnabled = false;
        if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) 
            lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
        
        BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
        if (interval > 0) 
            nextTime = interval;
        
        // 判断心跳结果
        int code = NamingResponseCode.OK;
        if (result.has(CommonParams.CODE)) 
            code = result.get(CommonParams.CODE).asInt();
        
        if (code == NamingResponseCode.RESOURCE_NOT_FOUND) 
            // 如果失败,则需要 重新注册实例
            Instance instance = new Instance();
            instance.setPort(beatInfo.getPort());
            instance.setIp(beatInfo.getIp());
            instance.setWeight(beatInfo.getWeight());
            instance.setMetadata(beatInfo.getMetadata());
            instance.setClusterName(beatInfo.getCluster());
            instance.setServiceName(beatInfo.getServiceName());
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(true);
            try 
                serverProxy.registerService(beatInfo.getServiceName(),
                                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
             catch (Exception ignore) 
            
        
     catch (NacosException ex) 
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: , code: , msg: ",
                            JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

     catch (Exception unknownEx) 
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: , unknown exception msg: ",
                            JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
     finally 
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    


2.4. 发送心跳

最终心跳的发送还是通过NamingProxysendBeat方法来实现:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException 

    if (NAMING_LOGGER.isDebugEnabled()) 
        NAMING_LOGGER.debug("[BEAT]  sending beat to server: ", namespaceId, beatInfo.toString());
    
    // 组织请求参数
    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) 
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    // 发送请求,这个地址就是:/v1/ns/instance/beat
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);


3.服务端

对于临时实例,服务端代码分两部分:

1)InstanceController提供了一个接口,处理客户端的心跳请求
2)定时检测实例心跳是否按期执行

3.1.InstanceController

与服务注册时一样,在nacos-naming模块中的InstanceController类中,定义了一个方法用来处理心跳请求:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception 
	// 解析心跳的请求参数
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) 
        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    
    String clusterName = WebUtils
        .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
    int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
    if (clientBeat != null) 
        if (StringUtils.isNotBlank(clientBeat.getCluster())) 
            clusterName = clientBeat.getCluster();
         else 
            // fix #2533
            clientBeat.setCluster(clusterName);
        
        ip = clientBeat.getIp();
        port = clientBeat.getPort();
    
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: , serviceName: ", clientBeat, serviceName);
    // 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息
    // 从Nacos的注册表中 获取实例
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
	// 如果获取失败,说明心跳失败,实例尚未注册
    if (instance == null) 
        if (clientBeat == null) 
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        

        Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                             + "perform data compensation operations, beat: , serviceName: ", clientBeat, serviceName);
		// 这里重新注册一个实例
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        instance.setWeight(clientBeat.getWeight());
        instance.setMetadata(clientBeat.getMetadata());
        instance.setClusterName(clusterName);
        instance.setServiceName(serviceName);
        instance.setInstanceId(instance.getInstanceId());
        instance.setEphemeral(clientBeat.isEphemeral());

        serviceManager.registerInstance(namespaceId, serviceName, instance);
    
	// 尝试基于namespaceId和serviceName从 注册表中获取Service服务
    Service service = serviceManager.getService(namespaceId, serviceName);
	// 如果不存在,说明服务不存在,返回404
    if (service == null) 
        throw new NacosException(NacosException.SERVER_ERROR,
                                 "service not found: " + serviceName + "@" + namespaceId);
    
    if (clientBeat == null) 
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
        clientBeat.setPort(port);
        clientBeat.setCluster(clusterName);
    
    // 如果心跳没问题,开始处理心跳结果
    service.processClientBeat(clientBeat);

    result.put(CommonParams.CODE, NamingResponseCode.OK);
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) 
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;


最终,在确认心跳请求对应的服务、实例都在的情况下,开始交给Service类处理这次心跳请求。调用了ServiceprocessClientBeat方法

3.2.处理心跳请求

查看Serviceservice.processClientBeat(clientBeat);方法:

public void processClientBeat(final RsInfo rsInfo) 
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);

可以看到心跳信息被封装到了 ClientBeatProcessor类中,交给了HealthCheckReactor处理,HealthCheckReactor就是对线程池的封装,不用过多查看。

关键的业务逻辑都在ClientBeatProcessor这个类中,它是一个Runnable,其中的run方法如下:

@Override
public void run() 
    Service service = this.service;
    if (Loggers.EVT_LOG.isDebugEnabled()) 
        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: ", rsInfo.toString());
    

    String ip = rsInfo.getIp();
    String clusterName = rsInfo.getCluster();
    int port = rsInfo.getPort();
    // 获取集群信息
    Cluster cluster = service.getClusterMap().get(clusterName);
    // 获取集群中的所有实例信息
    List<Instance> instances = cluster.allIPs(true);

    for (Instance instance : instances) 
        // 找到心跳的这个实例
        if (instance.getIp().equals(ip) && instance.getPort() == port) 
            if (Loggers.EVT_LOG.isDebugEnabled()) 
                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: ", rsInfo.toString());
            
            // 更新实例的最后一次心跳时间 lastBeat
            instance.setLastBeat(以上是关于Nacos源码分析专题-服务心跳的主要内容,如果未能解决你的问题,请参考以下文章

Nacos源码分析专题-服务心跳

Nacos源码分析专题-Nacos小结

Nacos源码分析专题-Nacos小结

Nacos源码分析专题-Nacos小结

nacos源码分析-心跳检测(服务端)

nacos源码分析-心跳检测(服务端)