Nacos源码1.4.1注册中心服务端

Posted 程序猿阿越

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos源码1.4.1注册中心服务端相关的知识,希望对你有一定的参考价值。

前言

本章分析Nacos1.4.1注册中心服务端,主要关注临时实例(Instance.ephemeral=true,默认情况)

  • 服务端模型:ServiceManager、Service、Cluster、Instance

  • Distro协议:服务于注册中心的AP协议

  • 服务查询:UDP监听、保护模式

  • Nacos集群管理:集群初始化,集群健康检查

  • Distro协议对于写请求的处理方式

  • 服务注册:节点本地数据更新、集群数据同步、UDP推送客户端

  • 客户端心跳:心跳超时处理

  • 集群数据同步:Nacos如何保证每个节点数据一致

一、服务端模型

从逻辑上,命名服务的模型如下。

Nacos源码(八)1.4.1注册中心服务端
命名服务逻辑模型.png

服务端与客户端的主要不同点在于,客户端的NamingService是针对某个Naocs集群的某个Namespace创建的,客户端使用过程Namespace是不变的。从服务端实现的角度看,如下。

Nacos源码(八)1.4.1注册中心服务端
命名服务实现模型.png

ServiceManager单例,管理Namespace-Group-Service的映射关系,其中Map 的Key是groupName@@serviceName。

@Component
public class ServiceManager implements RecordListener<Service{
    // namespace - groupName@@serviceName - Service
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
}

com.alibaba.nacos.naming.core.Service存储了Namespace和Group和Service的所有信息。

// com.alibaba.nacos.naming.core.Service
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements RecordRecordListener<Instances{
    // 所属namespace
    private String namespaceId;
}
// com.alibaba.nacos.api.naming.pojo.Service
public class Service implements Serializable {
    // groupName@@serviceName
    private String name;
    // 服务保护阈值,当大多数服务下线,认为当前注册中心节点发生故障,返回所有实例,包括非健康实例
    private float protectThreshold = 0.0F;
    // 分组
    private String groupName;
}

此外com.alibaba.nacos.naming.core.Service管理其下的所有Cluster。

// Cluster注册表,key是集群名称
private Map<String, Cluster> clusterMap = new HashMap<>();

Cluster管理所有持久实例和临时实例。

// com.alibaba.nacos.naming.core.Cluster
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    // 持久Instance
    private Set<Instance> persistentInstances = new HashSet<>();
    // 临时Instance
    private Set<Instance> ephemeralInstances = new HashSet<>();
    // 所属Service
    private Service service;
}
// com.alibaba.nacos.api.naming.pojo.Cluster
public class Cluster implements Serializable {
    /**
     * Name of belonging service.
     */

    private String serviceName;
    /**
     * Name of cluster.
     */

    private String name;
}

Instance服务实例,客户端服务注册的单位。

// com.alibaba.nacos.naming.core.Instance
public class Instance extends com.alibaba.nacos.api.naming.pojo.Instance implements Comparable {
    // 上次心跳时间
    private volatile long lastBeat = System.currentTimeMillis();
    // namespace
    private String tenant;
}
// com.alibaba.nacos.api.naming.pojo.Instance
public class Instance implements Serializable {
    // 唯一标识 = 分组+集群+服务+ip+端口
    private String instanceId;
    private String ip;
    private int port;
    private double weight = 1.0D;
    private boolean healthy = true;
    private boolean enabled = true;
    // 是否临时节点
    private boolean ephemeral = true;
    // 所属集群
    private String clusterName;
    // 服务名 = 分组+服务
    private String serviceName;
    // 元数据
    private Map<String, String> metadata = new HashMap<String, String>();
}

二、distro协议

Nacos注册中心采用distro协议,用于注册中心,AP。

对于客户端,服务端每个节点是对等的,无论读写请求发往哪个节点都可以被处理。如果某个节点处理失败,客户端会重新选择一个节点请求。客户端的请求主要包括:服务注册、服务查询、服务监听、心跳请求等。

// NamingProxy#reqApi
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
        String method)
 throws NacosException 
{
    // 随机选择某个节点作为第一次请求节点
    Random random = new Random(System.currentTimeMillis());
    int index = random.nextInt(servers.size());
    for (int i = 0; i < servers.size(); i++) {
      String server = servers.get(index);
      try {
        return callServer(api, params, body, server, method);
      } catch (NacosException e) {
        exception = e;
      }
      // 发生异常,选择下一个节点尝试请求
      index = (index + 1) % servers.size();
    }
   // ...
}

对于服务端要处理以下操作:

  • 读:集群每个节点都存储所有数据,每个节点都可以处理读请求,返回当前节点注册表里的数据,无论数据是否是最新的。(GET /nacos/v1/ns/instance/list)

  • 写:以服务为单位,每个节点负责一部分服务。从服务维度来说,如果当前节点负责这个服务,那么这个节点称为责任节点。客户端向随机服务端发起写请求后,服务端判断自己是否是责任节点,如果是的话自己处理,否则转发至其他节点。(DistroFilter)

  • 客户端心跳:服务端如果长时间未收到客户端心跳,则下线该服务实例(ClientBeatCheckTask、ClientBeatProcessor);服务端如果收到客户端心跳,但服务不存在,执行注册逻辑(BeatReactor、PUT /nacos/v1/ns/instance/beat)。备注:临时实例注册表是通过客户端主动发送心跳来维护的;持久实例是通过服务端主动做健康检查来维护的,这里只考虑临时实例注册。

  • 集群管理:每个服务端节点主动发送健康检查到其他节点,响应成功的节点被该节点视为健康节点;另外健康检查也负责同步集群成员(MemberInfoReportTask)。

  • 数据同步:以服务为单位,责任节点处理完成写请求后,异步将数据同步给其他节点,保证最终所有节点的注册表信息一致(DistroProtocol)。

三、集群管理

集群管理是Nacos的通用功能,非注册中心独有。

Nacos源码(八)1.4.1注册中心服务端
控制台-集群管理.png

ServerMemberManager负责Nacos集群管理。

@Component(value = "serverMemberManager")
public class ServerMemberManager implements ApplicationListener<WebServerInitializedEvent{
    // 所有nacos节点
    private volatile ConcurrentSkipListMap<String, Member> serverList;
    // nacos自己如何发现nacos服务
    private MemberLookup lookup;
    // 当前nacos节点
    private volatile Member self;
    // 健康状态的节点地址集合
    private volatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
    // 集群成员信息广播任务
    private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
    private volatile boolean isInIpList = true;
    private int port;
    private String localAddress;
}

Member代表Nacos集群节点。

public class Member implements Comparable<Member>, Cloneable {
    // ip
    private String ip;
    // port
    private int port = -1;
    // 状态
    private volatile NodeState state = NodeState.UP;
    // 扩展信息
    private Map<String, Object> extendInfo = Collections.synchronizedMap(new TreeMap<>());
    // ip:port
    private String address = "";
    // 健康检查连续失败次数
    private transient int failAccessCnt = 0;
}

NodeState,节点状态。

public enum NodeState {
    STARTING,
    UP,
    SUSPICIOUS,
    DOWN,
    ISOLATION,
}

只有UP、SUSPICIOUS、DOWN三种状态在使用。

  • UP:健康检查通过。

  • SUSPICIOUS:健康检查失败,且失败次数小于一定阈值。

  • DOWN:健康检查失败,且失败次数大于一定阈值。

1、集群初始化

一个新加入的节点,需要通过MemberLookup初始化内存中的集群列表ServerMemberManager.serverList,才能进行后续心跳发送加入集群。

// ServerMemberManager
protected void init() throws NacosException {
    this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);
    this.localAddress = InetUtils.getSelfIP() + ":" + port;
    this.self = MemberUtil.singleParse(this.localAddress);
    this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
    serverList.put(self.getAddress(), self);
    // register MembersChangeEvent publisher IPChangeEvent subscriber/publisher
    registerClusterEvent();
    // 初始化集群列表
    initAndStartLookup();
}

MemberLookup实现类有三个:

  • StandaloneMemberLookup:当节点以standalone形式启动,直接取自身作为集群列表。

  • FileConfigMemberLookup:取nacos.home/conf/cluster.conf配置文件中的内容作为集群列表。

ServerMemberManager会根据当前情况选择合适的MemberLookup实现,执行start方法。

// ServerMemberManager
private void initAndStartLookup() throws NacosException {
    this.lookup = LookupFactory.createLookUp(this);
    this.lookup.start();
}

FileConfigMemberLookup为例,start方法读取{nacos.home}/conf/cluster.conf,通过memberManager.memberChange(members)回调MemberManager。此外利用JDK的WatchService实现文件监听,当cluster.conf发生变化时会重新加载集群列表回调MemberManager。

// FileConfigMemberLookup
@Override
public void start() throws NacosException {
    if (start.compareAndSet(falsetrue)) {
        readClusterConfFromDisk();
        try {
            // 利用JDK的WatchService实现文件监听
            WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);
        } catch (Throwable e) {
            Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());
        }
    }
}
private void readClusterConfFromDisk() {
    // 1. 读取{nacos.home}/conf/cluster.conf,加载tmpMembers
    Collection<Member> tmpMembers = new ArrayList<>();
    try {
        List<String> tmp = EnvUtil.readClusterConf();
        tmpMembers = MemberUtil.readServerConf(tmp);
    } catch (Throwable e) {
        Loggers.CLUSTER
                .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());
    }
    // 2. this.memberManager.memberChange(members)
    afterLookup(tmpMembers);
}

ServerMemberManager的memberChange方法更新内存中的nacos节点列表,发布MembersChangeEvent事件。

// 健康状态的节点地址集合
private volatile Set<String> memberAddressInfos = new ConcurrentHashSet<>();
// 所有nacos节点
private volatile ConcurrentSkipListMap<String, Member> serverList;
synchronized boolean memberChange(Collection<Member> members) {
    boolean isContainSelfIp = members.stream()
            .anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));
    if (isContainSelfIp) {
        isInIpList = true;
    } else {
        isInIpList = false;
        members.add(this.self);
    }

    boolean hasChange = members.size() != serverList.size();
    ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();
    Set<String> tmpAddressInfo = new ConcurrentHashSet<>();
    for (Member member : members) {
        final String address = member.getAddress();

        if (!serverList.containsKey(address)) {
            hasChange = true;
        }

        tmpMap.put(address, member);
        if (NodeState.UP.equals(member.getState())) {
            tmpAddressInfo.add(address);
        }
    }
    serverList = tmpMap;
    memberAddressInfos = tmpAddressInfo;

    Collection<Member> finalMembers = allMembers();

    if (hasChange) {
        MemberUtil.syncToFile(finalMembers);
        Event event = MembersChangeEvent.builder().members(finalMembers).build();
        NotifyCenter.publishEvent(event);
    }

    return hasChange;
}

2、集群健康检查

ServerMemberManager实现了ApplicationListener接口,关注WebServerInitializedEvent事件。

Nacos源码(八)1.4.1注册中心服务端
ServerMemberManager监听Spring事件.png

Tomcat启动后会回调ServerMemberManager,开启一个MemberInfoReportTask当前节点信息广播任务。

// ServerMemberManager
// 集群成员信息广播任务
private final MemberInfoReportTask infoReportTask = new MemberInfoReportTask();
public void onApplicationEvent(WebServerInitializedEvent event) {
    getSelf().setState(NodeState.UP);
    if (!EnvUtil.getStandaloneMode()) {
        GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);
    }
    EnvUtil.setPort(event.getWebServer().getPort());
    EnvUtil.setLocalAddress(this.localAddress);
}

MemberInfoReportTask,每2s执行POST /v1/core/cluster/report,向集群随机节点(包含DOWN)发送当前节点信息(Member),一方面是为了同步当前节点信息,另一方面也是健康检查。

class MemberInfoReportTask extends Task {
    private final GenericType<RestResult<String>> reference = new GenericType<RestResult<String>>() {
    };
    private int cursor = 0;
    @Override
    protected void executeBody() {
        // 获取除当前节点以外其他所有节点,包含down的
        List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();
        if (members.isEmpty()) {
            return;
        }
        // 轮询选择
        this.cursor = (this.cursor + 1) % members.size();
        Member target = members.get(cursor);
        // 调用/v1/core/cluster/report,将getSelf自己的Member信息传给对端
        final String url = HttpUtils
                .buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT, "/cluster/report");

        try {
            asyncRestTemplate
                    .post(url, Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version),
                            Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {
                                // 通讯成功
                                @Override
                                public void onReceive(RestResult<String> result) {
                                    if (result.ok()) {
                                        // 业务成功
                                        MemberUtil.onSuccess(ServerMemberManager.this, target);
                                    } else {
                                        // 业务失败
                                        MemberUtil.onFail(ServerMemberManager.this, target);
                                    }
                                }

                                // 通讯失败
                                @Override
                                public void onError(Throwable throwable) {
                                    MemberUtil.onFail(ServerMemberManager.this, target, throwable);
                                }
                            });
        } catch (Throwable ex) {
            Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(),
                    ExceptionUtil.getAllExceptionMsg(ex));
        }
    }

    // 2s执行一次
    @Override
    protected void after() {
        GlobalExecutor.scheduleByCommon(this2_000L);
    }
}

当前节点请求其他节点/v1/core/cluster/report成功(通讯和业务都成功),执行MemberUtil.onSuccess方法,将对端member设置为健康,并且触发MembersChangeEvent事件。

// MemberUtil
public static void onSuccess(final ServerMemberManager manager, final Member member) {
    final NodeState old = member.getState();
    manager.getMemberAddressInfos().add(member.getAddress());
    member.setState(NodeState.UP);
    member.setFailAccessCnt(0);
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange();
    }
}

当前节点请求其他节点/v1/core/cluster/report失败(通讯或业务失败),执行MemberUtil.onFail方法,首先设置member为SUSPICIOUS状态,如果连续失败超过3次或是connection refused状态,会设置member为DOWN状态。偶尔几次健康检查失败,不会导致member直接标记为DOWN不可用,SUSPICIOUS状态仍然会参与Distro协议,负责部分注册中心的写请求(见后面分析DistroFilter)。最后触发MembersChangeEvent事件。

public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {
    manager.getMemberAddressInfos().remove(member.getAddress());
    final NodeState old = member.getState();
    // 偶尔失败,设置为SUSPICIOUS中间状态,介于UP与DOWN之间,允许参与Distro协议,认为是健康节点
    member.setState(NodeState.SUSPICIOUS);
    member.setFailAccessCnt(member.getFailAccessCnt() + 1);
    // 超过连续三次失败 或 connection refused 设置节点为DOWN
    int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);

    if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils
            .containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
        member.setState(NodeState.DOWN);
    }
    if (!Objects.equals(old, member.getState())) {
        manager.notifyMemberChange();
    }
}

接下来看看被健康检查的对端节点如何处理/v1/core/cluster/report请求。

调用MemberManager的update方法。

// NacosClusterController
@PostMapping(value = {"/report"})
public RestResult<String> report(@RequestBody Member node) {
    if (!node.check()) {
        return RestResultUtils.failedWithMsg(400"Node information is illegal");
    }
    node.setState(NodeState.UP);
    node.setFailAccessCnt(0);
    boolean result = memberManager.update(node);
    return RestResultUtils.success(Boolean.toString(result));
}

MemberManager更新serverList中的Member信息,并发布MembersChangeEvent事件。这个健康检查是双向的,无论请求节点还是响应节点,都会更新内存中的集群节点状态。

// MemberManager
public boolean update(Member newMember) {
    String address = newMember.getAddress();
    // 不在配置文件中的member不会加入集群
    if (!serverList.containsKey(address)) {
        return false;
    }
    serverList.computeIfPresent(address, (s, member) -> {
        if (NodeState.DOWN.equals(newMember.getState())) {
            memberAddressInfos.remove(newMember.getAddress());
        }
        boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);
        newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());
        MemberUtil.copy(newMember, member);
        if (isPublishChangeEvent) {
            notifyMemberChange();
        }
        return member;
    });
    return true;
}

总结来说,每个nacos节点会每隔2s轮询选择其他节点,上报自己的节点信息,将双方serverList中的Member信息更新。如果对端健康检查失败,对端节点标记为SUSPICIOUS,表示对端可能下线;当连续超过3次健康检查失败,会标记为对端节点为DOWN。此外,这个健康检查是双向的,每个节点都会主动发起健康检查,也会被动接收健康检查。

四、服务订阅/查询

GET /nacos/v1/ns/instance/list 服务订阅/查询,逻辑在com.alibaba.nacos.naming.controllers.InstanceController#doSrvIpxt方法中。

订阅与普通查询的区别是客户端传来的udpPort是否等于0,如果等于0,表示仅查询,如果大于0表示订阅。

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
            int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly)
 throws Exception 
{
  ObjectNode result = JacksonUtils.createEmptyJsonNode();
  // 1. 定位Service 根据namespace + groupName@@serviceName获取Service
  Service service = serviceManager.getService(namespaceId, serviceName);
  long cacheMillis = switchDomain.getDefaultCacheMillis();
  try {
    // udp推送服务新增一个客户端
    if (udpPort > 0 && pushService.canEnablePush(agent)) {
      pushService
        .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                   pushDataSource, tid, app);
      cacheMillis = switchDomain.getPushCacheMillis(serviceName); // 10s
    }
  } catch (Exception e) {
    Loggers.SRV_LOG.error();
    cacheMillis = switchDomain.getDefaultCacheMillis();
  }
  if (service == null) {
    // ...
    result.replace("hosts", JacksonUtils.createEmptyArrayNode());
    return result;
  }
  // service.enabled=false抛出异常
  checkIfDisabled(service);

  // 2. Service定位Instance
  List<Instance> srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
  if (CollectionUtils.isEmpty(srvedIPs)) {
    // ...
    result.set("hosts", JacksonUtils.createEmptyArrayNode());
    return result;
  }
  // 对于instance分组,健康和非健康
  Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
  ipMap.put(Boolean.TRUE, new ArrayList<>());
  ipMap.put(Boolean.FALSE, new ArrayList<>());
  for (Instance ip : srvedIPs) {
    ipMap.get(ip.isHealthy()).add(ip);
  }
  // 3. 保护模式
  double threshold = service.getProtectThreshold();
  if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
    ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
    ipMap.get(Boolean.FALSE).clear();
  }

  // 4. 结果组装
  ArrayNode hosts = JacksonUtils.createEmptyArrayNode();

  for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
    List<Instance> ips = entry.getValue();
    if (healthyOnly && !entry.getKey()) {
      continue;
    }
    for (Instance instance : ips) {
      if (!instance.isEnabled()) {
        continue;
      }
      ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
      ipObj.put("ip", instance.getIp());
      ipObj.put("port", instance.getPort());
      // ...
      hosts.add(ipObj);
    }
  }
  result.replace("hosts", hosts);
  // ...
  return result;
}

这段查询逻辑有点长,主要逻辑是根据namespace和group和service定位到Service实例,再根据clustername定位到Cluster,返回Cluster中的Instance列表。

// ServiceManager.java
// namespace - groupName@@serviceName - Service
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    return chooseServiceMap(namespaceId).get(serviceName);
}
public Map<String, Service> chooseServiceMap(String namespaceId) {
      return serviceMap.get(namespaceId);
}
// Service.java
// Cluster注册表,key是集群名称
private Map<String, Cluster> clusterMap = new HashMap<>();
public List<Instance> srvIPs(List<String> clusters) {
  if (CollectionUtils.isEmpty(clusters)) {
    clusters = new ArrayList<>();
    clusters.addAll(clusterMap.keySet());
  }
  return allIPs(clusters);
}
public List<Instance> allIPs(List<String> clusters) {
  List<Instance> result = new ArrayList<>();
  for (String cluster : clusters) {
    Cluster clusterObj = clusterMap.get(cluster);
    if (clusterObj == null) {
      continue;
    }
    result.addAll(clusterObj.allIPs());
  }
  return result;
}

客户端注册UDP监听

InstanceController#doSrvIpxt无论客户端查询的服务是否存在,都会向服务端的PushService中注册一个监听。当服务发生变化时,服务端会通过udp协议推送至客户端。这里的udpPort是客户端的udp端口号,由客户端在发起查询时传入,见上一章。

@Autowired
private PushService pushService;
// InstanceController#doSrvIpxt
try {
  // udp推送服务新增一个客户端
  if (udpPort > 0 && pushService.canEnablePush(agent)) {
    pushService
      .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                 pushDataSource, tid, app);
    cacheMillis = switchDomain.getPushCacheMillis(serviceName); // 10s
  }
catch (Exception e) {
  cacheMillis = switchDomain.getDefaultCacheMillis();
}
@Component
public class PushService implements ApplicationContextAwareApplicationListener<ServiceChangeEvent{
    // 第一个key是namespace+groupService 第二个key是PushClient.toString
    private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap = new ConcurrentHashMap<>();
}

另外InstanceController#doSrvIpxt控制客户端的拉取服务注册表的时间间隔为cacheMillis=10s,见上一章客户端服务发现UpdateTask。

保护模式

InstanceController#doSrvIpxt在处理Instance列表时有一个常见的逻辑操作。就是当某个服务下的实例大量下线(Instance.healthy=false)时,会开启保护模式,认为是服务端自己发生了网络分区,将所有实例认为是健康状态返回给客户端。这是AP模式注册中心的一个代表性功能,如Eureka。

何为大量?

每个Service实例中维护一个protectThreshold用于计算是否是大量服务下线,默认是0。

Nacos源码(八)1.4.1注册中心服务端
protectThreshold.png
public class Service implements Serializable {
    // 服务保护阈值,当大多数服务下线,认为当前注册中心节点发生故障,返回所有实例,包括非健康实例
    private float protectThreshold = 0.0F;
}

当存活实例(ipMap.get(Boolean.TRUE).size())/总实例(srvedIPs.size)<= protectThreshold时,认为注册中心发生故障,进入保护模式,返回服务下所有实例。当默认配置为0时,如果某个服务下所有实例都无法与Nacos通讯,会返回该服务下所有实例。

// InstanceController#doSrvIpxt
// 对于instance分组,健康和非健康
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
  ipMap.get(ip.isHealthy()).add(ip);
}
// 3. 保护模式
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
  ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
  ipMap.get(Boolean.FALSE).clear();
}

五、写请求

对于客户端的写请求(如服务注册),对于客户端来说服务端是对等的,请求任何一个节点都可以正常响应。

但是对于服务端来说,并非所有写请求都由当前节点处理。

Nacos源码(八)1.4.1注册中心服务端
DistroFilter.png

如/v1/ns/instance处理客户端服务注册,方法被CanDistro注解,此类方法都会经过DistroFilter。

// InstanceController
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
   //...
}

ControllerMethodsCache根据请求路径、请求方法、请求参数,找到RequestMapping注解的方法返回给DistroFilter。DistroMapper会根据服务名(groupName@@serviceName)定位到责任节点。

如果当前节点是责任节点,那么继续执行后续逻辑;否则当前节点会将写请求转发至责任节点处理,然后用责任节点的响应报文来响应客户端。

public class DistroFilter implements Filter {
    @Autowired
    private DistroMapper distroMapper;
    @Autowired
    private ControllerMethodsCache controllerMethodsCache;

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException 
{
        ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);
        HttpServletResponse resp = (HttpServletResponse) servletResponse;
        try {
            String path = new URI(req.getRequestURI()).getPath();
            String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
            // 根据请求找到方法
            Method method = controllerMethodsCache.getMethod(req);
            // serviceName版本适配,使用groupName@@serviceName
            String groupName = req.getParameter(CommonParams.GROUP_NAME);
            String groupedServiceName = ...;

            // 如果被CanDistro注解,且当前节点不负责groupedServiceName
            if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName)) {

                String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);

                if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
                    resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
                            "receive invalid redirect request from peer " + req.getRemoteAddr());
                    return;
                }
                                // 获取实际负责该服务的目标节点
                final String targetServer = distroMapper.mapSrv(groupedServiceName);
                                // 组装请求参数
                List<String> headerList = new ArrayList<>(16);
                Enumeration<String> headers = req.getHeaderNames();
                while (headers.hasMoreElements()) {
                    String headerName = headers.nextElement();
                    headerList.add(headerName);
                    headerList.add(req.getHeader(headerName));
                }

                final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
                final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());

                // 请求实际责任节点
                RestResult<String> result = HttpClient
                        .request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                                PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
                String data = result.ok() ? result.getData() : result.getMessage();
                try {
                    // 取负责节点的响应报文响应客户端
                    WebUtils.response(resp, data, result.getCode());
                } catch (Exception ignore) {
                    Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName)
                            + urlString);
                }
            } else {
                // 当前节点负责groupedServiceName,直接处理
                OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);
                requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName);
                filterChain.doFilter(requestWrapper, resp);
            }
        } catch (AccessControlException e) {
            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
        } catch (NoSuchMethodException e) {
            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
                    "no such api:" + req.getMethod() + ":" + req.getRequestURI());
        } catch (Exception e) {
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                    "Server failed," + ExceptionUtil.getAllExceptionMsg(e));
        }
    }
}

重点在于DistroMapper如何分配哪个节点负责哪些服务。

@Component("distroMapper")
public class DistroMapper extends MemberChangeListener {
    // 健康节点
    private volatile List<String> healthyList = new ArrayList<>();
    // 开关服务
    private final SwitchDomain switchDomain;
    // 节点管理
    private final ServerMemberManager memberManager;
}

DistroMapper内部又维护了一个集群健康节点列表,当收到MembersChangeEvent事件时,会更新这个列表。根据第四章节的集群管理,当集群节点健康状况发生变更时,都会触发MembersChangeEvent事件。

关注onEvent方法,这里会过滤出UP和SUSPICIOUS状态的节点作为Distro协议认为的健康节点。

//DistroMapper
public void onEvent(MembersChangeEvent event) {
    List<String> list = MemberUtil.simpleMembers(MemberUtil.selectTargetMembers(event.getMembers(),
            member -> NodeState.UP.equals(member.getState()) || NodeState.SUSPICIOUS.equals(member.getState())));
    Collections.sort(list);
    Collection<String> old = healthyList;
    healthyList = Collections.unmodifiableList(list);
}

如果hash(serviceName) % healthList.size == 当前节点所处healthList下标,则认为当前节点是负责这个service的节点。这里不是很明白为什么要indexOf+lastIndexOf共同判断,直接indexOf == target不行吗。

//DistroMapper
// 健康节点
private volatile List<String> healthyList = new ArrayList<>();
public boolean responsible(String serviceName) {
    final List<String> servers = healthyList;
    // 如果关闭distro协议 或 standalone启动 认为当前节点可以负责写请求
    if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
        return true;
    }
    if (CollectionUtils.isEmpty(servers)) {
        return false;
    }
    // 当前节点所处servers下标
    int index = servers.indexOf(EnvUtil.getLocalAddress());
    int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
    if (lastIndex < 0 || index < 0) {
        return true;
    }
    // 哈希%servers大小
    int target = distroHash(serviceName) % servers.size();
    return target >= index && target <= lastIndex;
}
private int distroHash(String serviceName) {
  return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
//DistroMapper
// 健康节点
private volatile List<String> healthyList = new ArrayList<>();
public String mapSrv(String serviceName) {
    final List<String> servers = healthyList;
    if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {
        return EnvUtil.getLocalAddress();
    }
    try {
        int index = distroHash(serviceName) % servers.size();
        return servers.get(index);
    } catch (Throwable e) {
        return EnvUtil.getLocalAddress();
    }
}

六、服务注册

有了上述铺垫,看一下服务注册的逻辑,POST /v1/ns/instance。

// InstanceController
@CanDistro
@PostMapping
public String register(HttpServletRequest request) throws Exception {
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    final Instance instance = parseInstance(request);
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

ServiceManager注册实例分为两步,一步是确保Service存在,第二步是将Instance加入Service。

// ServiceManager
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 1. 如果首次注册,才会执行,创建Service,放入ServiceManager管理
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    // 获取Service实例
    Service service = getService(namespaceId, serviceName);
    // 2. 把Instance加入Service
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
Nacos源码(八)1.4.1注册中心服务端
服务注册-流程.png

创建服务

因为服务实例Instance在服务Service下面维护,优先确保ServiceManager的serviceMap中存在Service。

// ServiceManager
// namespace - groupName@@serviceName - Service
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
    createServiceIfAbsent(namespaceId, serviceName, local, null);
}
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException 
{
    Service service = getService(namespaceId, serviceName);
    // serviceMap还没有对应service实例,才执行后续逻辑
    if (service == null) {
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        // ...
                // 核心逻辑
        putServiceAndInit(service);
         // 非临时节点,持久化Service;临时节点不会持久化Service
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

忽略local=false的非临时节点逻辑,重点关注putServiceAndInit方法。这里主要是将service写入serviceMap(内部还没有Instance),执行Service的init方法开启服务对应客户端心跳检测,最后ConsistencyService同时监听了这个服务的临时和非临时节点。

// ServiceManager
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private void putServiceAndInit(Service service) throws NacosException {
    // 1. 写入serviceMap内存
    putService(service);
    // 2. 开启客户端心跳检测
    service.init();
    // 3. 监听
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
}

客户端心跳检测后续再说,看看这个监听做了什么。

ConsistencyService一致性服务,定义了一些KV存储的常用功能,包括增删改查和监听。

public interface ConsistencyService {
    void put(String key, Record value) throws NacosException;
    void remove(String key) throws NacosException;
    Datum get(String key) throws NacosException;
    void listen(String key, RecordListener listener) throws NacosException;
    void unListen(String key, RecordListener listener) throws NacosException;
    boolean isAvailable();
}

ConsistencyService的实现类分为两类。

一类是代理类,会根据key的pattern决定实际使用哪个ConsistencyService实现类处理。如DelegateConsistencyServiceImpl根据key是否匹配临时节点的pattern,决定走临时节点实现还是持久节点实现。

@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
    // Raft CP PersistentServiceProcessor
    private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
    // Distro AP DistroConsistencyServiceImpl
    private final EphemeralConsistencyService ephemeralConsistencyService;
    @Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }
    @Override
    public void listen(String key, RecordListener listener) throws NacosException {
        //...
        mapConsistencyService(key).listen(key, listener);
    }
        // ...
    private ConsistencyService mapConsistencyService(String key) {
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }
}

另一类就是真实的实现类。

PersistentServiceProcessor是基于JRaft实现的一致性服务,之前看配置中心的时候知道,分为写一致和读一致(线性一致)。对于注册中心来说,如果是持久节点会走Raft一致性服务。

// PersistentServiceProcessor
@Override
public void put(String key, Record value) throws NacosException {
    final BatchWriteRequest req = new BatchWriteRequest();
    Datum datum = Datum.createDatum(key, value);
    req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
    final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
            .setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
    try {
        protocol.write(request);
    } catch (Exception e) {
        throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
    }
}
@Override
public Datum get(String key) throws NacosException {
  final List<byte[]> keys = new ArrayList<>(1);
  keys.add(ByteUtils.toBytes(key));
  final ReadRequest req = ReadRequest.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
    .setData(ByteString.copyFrom(serializer.serialize(keys))).build();
  try {
    Response resp = protocol.getData(req);
    if (resp.getSuccess()) {
      BatchReadResponse response = serializer
        .deserialize(resp.getData().toByteArray(), BatchReadResponse.class);
      final List<byte[]> rValues = response.getValues();
      return rValues.isEmpty() ? null : serializer.deserialize(rValues.get(0), getDatumTypeFromKey(key));
    }
    throw new NacosException(ErrorCode.ProtoReadError.getCode(), resp.getErrMsg());
  } catch (Throwable e) {
    throw new NacosException(ErrorCode.ProtoReadError.getCode(), e.getMessage());
  }
}

DistroConsistencyServiceImpl基于Distro协议,如果是临时节点会走这个一致性服务,只会将数据存储在内存中。暂时先只看监听逻辑,这里传入的listener就是Service,Service实现了RecordListener接口。

// DistroConsistencyServiceImpl
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
@Override
public void listen(String key, RecordListener listener) throws NacosException {
  if (!listeners.containsKey(key)) {
    listeners.put(key, new ConcurrentLinkedQueue<>());
  }
  if (listeners.get(key).contains(listener)) {
    return;
  }
  listeners.get(key).add(listener);
}

至此创建Service的流程走完了,主要是创建Service实例写入ServiceManager的内存map,Service开启客户端心跳检测,最后在DistroConsistencyServiceImpl注册服务实例变化的监听。

注册实例

服务注册的第二步是更新Service内部的Instance列表,将新的实例加入Instance列表。

// ServiceManager
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException 
{
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    Service service = getService(namespaceId, serviceName);
    synchronized (service) {
        // 从底层存储获取当前Instance列表,将新加入的Instance加入并返回
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        // 写入底层存储
        consistencyService.put(key, instances);
    }
}

重点看consistencyService.put的实现。关注临时节点注册,这里的实现类是DistroConsistencyServiceImpl。

// DistroConsistencyServiceImpl
public void put(String key, Record value) throws NacosException {
    // 写入数据
    onPut(key, value);
    // 将写入数据,同步至所有Member
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}

本地更新

首先onPut方法将数据写入底层存储,dataStore是一个基于内存的kv存储,Datum封装了kv结构,以服务为key,以服务下所有Instance为value,写入kv存储。之后通过Notifier提交了一个任务,用于通知实例服务实例变更。

// DistroConsistencyServiceImpl
private volatile Notifier notifier = new Notifier();
private final DataStore dataStore;
public void onPut(String key, Record value) {
    // 1. 如果是临时节点,写入内存map
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    if (!listeners.containsKey(key)) {
        return;
    }
    // 2. 新增key变更任务,后续通知监听器
    notifier.addTask(key, DataOperation.CHANGE);
}

Notifier是一个简单的生产消费模型实现Runnable接口,将发生变化的服务,调用对应RecordListener。

// DistroConsistencyServiceImpl.Notifier
// 任务队列
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
// 生产任务
public void addTask(String datumKey, DataOperation action) {
  // ...
  tasks.offer(Pair.with(datumKey, action));
}
// 消费任务
@Override
public void run() {
  for (; ; ) {
    Pair<String, DataOperation> pair = tasks.take();
    handle(pair);
  }
}
// 调用Listener
private void handle(Pair<String, DataOperation> pair) {
    String datumKey = pair.getValue0();
    DataOperation action = pair.getValue1();
    for (RecordListener listener : listeners.get(datumKey)) {
      if (action == DataOperation.CHANGE) {
        listener.onChange(datumKey, dataStore.get(datumKey).value);
      }
      if (action == DataOperation.DELETE) {
        listener.onDelete(datumKey);
      }
    }
}

Service 实现RecordListener接口,当底层存储的Instance发生变更了,这里都会收到回调。更新内存中ClusterMap,并将变更后的自己的信息通过UDP推送给监听当前服务的所有客户端。

// Service
public void onChange(String key, Instances value) throws Exception {
    // ...
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    // ...
}
// Cluster注册表,key是集群名称
private Map<String, Cluster> clusterMap = new HashMap<>();
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
  Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
  for (Instance instance : instances) {
    // ...
  }
  for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
    List<Instance> entryIPs = entry.getValue();
    clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
  }
  setLastModifiedMillis(System.currentTimeMillis());
  // UDP将Service变更推送给客户端
  getPushService().serviceChanged(this);
}

UDP推送客户端的逻辑比较简单,参考上一章的客户端接收UDP推送逻辑 & 本章第二节服务查询时注册UDP监听客户端逻辑即可,直接跳过。

集群数据同步

DistroConsistencyServiceImpl写入数据到底层存储后,将写入的数据延迟1s(nacos.naming.distro.taskDispatchPeriod/2=2s/2=1s)推送给集群中所有节点。(这意味着,客户端感知到服务注册表变更后,如果立即向集群中其他节点查询注册表,可能返回不一致数据)

// DistroConsistencyServiceImpl
public void put(String key, Record value) throws NacosException {
    // 写入数据
    onPut(key, value);
    // 将写入数据,同步至所有Member
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}

DistroProtocol循环节点中所有Member(包括不健康的),针对每个集群节点提交一个DistroDelayTask延迟任务。

// DistroProtocol
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    }
}

后续流程比较长,主要是通过key重新查询底层存储获取最新数据后,调用每个节点的/v1/ns/distro/datum接口。

每个节点接收数据同步请求,最终是调用DistroConsistencyServiceImpl的processData方法,转换参数后还是调用了本地更新方法onPut,参考上一小节。

// DistroConsistencyServiceImpl
public boolean processData(DistroData distroData) {
    DistroHttpData distroHttpData = (DistroHttpData) distroData;
    Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
    onPut(datum.key, datum.value);
    return true;
}

七、客户端心跳

心跳超时检测任务

服务注册时,每个Service都会开启一个定时任务,用于检测当前服务下的Instance是否按时发送过心跳。定时任务在Service的init方法调用时开启,每5s执行一次。

// Service
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
// Cluster注册表,key是集群名称
private Map<String, Cluster> clusterMap = new HashMap<>();
public void init() {
    // 提交客户端心跳检测任务 每5s执行一次
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    // ...
}

ClientBeatCheckTask客户端心跳超时检测任务,循环所有临时节点,如果超过15秒没有收到心跳,标记Instance为非健康(注意DataStore没有更新);如果超过30秒没有收到心跳,调用当前节点DELETE /v1/ns/instance删除服务下的这个实例。(DELETE /v1/ns/instance流程和注册实例相反,但是也是更新Service下的Instance)

public class ClientBeatCheckTask implements Runnable {
    private Service service;
    @Override
    public void run() {
        try {
            // distro 如果当前节点不负责这个service,不处理
            if (!getDistroMapper().responsible(service.getName())) {
                return;
            }
            if (!getSwitchDomain().isHealthCheckEnabled()) {
                return;
            }
            // 所有临时实例
            List<Instance> instances = service.allIPs(true);
            // 超过15s没收到心跳,标记为非健康
            for (Instance instance : instances) {

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    if (!instance.isMarked()) {
                        if (instance.isHealthy()) {
                            instance.setHealthy(false);
                            // UDP推送客户端
                            getPushService().serviceChanged(service);
                        }
                    }
                }
            }
            if (!getGlobalConfig().isExpireInstance()) {
                return;
            }
            // 超过30s没收到心跳,直接从注册表中删除
            for (Instance instance : instances) {
                if (instance.isMarked()) {
                    continue;
                }
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // 走http调用当前节点 DELETE /v1/ns/instance
                    deleteIp(instance);
                }
            }

        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
}

心跳超时的两个阈值是从Instance的metadata中来的,preserved.heart.beat.timeout(默认15s)和preserved.ip.delete.timeout(默认30s),单位都是毫秒。

// Instance
public long getInstanceHeartBeatTimeOut() {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,
            Constants.DEFAULT_HEART_BEAT_TIMEOUT);
}

public long getIpDeleteTimeout() {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
            Constants.DEFAULT_IP_DELETE_TIMEOUT);
}

实例元数据可以在服务注册的时候设置,也可以在控制台设置,可以是json格式,也可以是k1=v1,k2=v2形式。

Nacos源码(八)1.4.1注册中心服务端
控制台-实例元数据设置.png

处理客户端心跳

客户端心跳PUT /nacos/v1/ns/instance/beat。由于客户端心跳是个写操作(更新内存中的实例上次心跳时间),所以被@CanDistro注解,由集群中责任节点处理。

// com.alibaba.nacos.naming.controllers.InstanceController
@CanDistro
@PutMapping("/beat")
public ObjectNode beat(HttpServletRequest request) throws Exception {
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    // 默认控制客户端心跳是5秒
    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
    // 从请求获取clientBeat,namespace,serviceName,clusterName...
       String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
    RsInfo clientBeat = null;
    if (StringUtils.isNotBlank(beat)) {
      clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
    }
    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
    // 1. 如果服务没注册,执行注册逻辑
    if (instance == null) {
        // 如果lightBeatEnabled=true且发送心跳时客户端没有注册,需要客户端发起注册
        if (clientBeat == null) {
            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
            return result;
        }
        // 服务端注册逻辑
        instance = new Instance();
        instance.setPort(clientBeat.getPort());
        instance.setIp(clientBeat.getIp());
        // ...
        serviceManager.registerInstance(namespaceId, serviceName, instance);
    }
    Service service = serviceManager.getService(namespaceId, serviceName);
    if (clientBeat == null) {
        clientBeat = new RsInfo();
        clientBeat.setIp(ip);
       // ...
    }
    // 2. 更新instance健康状态
    service.processClientBeat(clientBeat);
    result.put(CommonParams.CODE, NamingResponseCode.OK);
    // 如果instance有设置心跳间隔preserved.heart.beat.interval,优先使用instance设置的心跳间隔
    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
    }
    // 服务端控制是否允许light beat, 默认true
    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
    return result;
}

客户端发送心跳时,服务端没有收到客户端服务注册请求怎么处理?

回顾上一章,当客户端发送心跳后,判断服务端响应code=NamingResponseCode.RESOURCE_NOT_FOUND时,会主动发起一次注册请求。

但是这取决于lightBeatEnabled。

lightBeatEnabled=false,表示不允许light beat,需要客户端发送全量的心跳信息。当服务端发现客户端尚未注册时,会使用请求参数中的beat反序列化为RsInfo,代替客户端进行注册。

lightBeatEnabled=true,是默认选项,表示客户端不会发送完整的心跳信息。当服务端发现客户端尚未注册时会返回code=NamingResponseCode.RESOURCE_NOT_FOUND。

总结,如上一章所述,默认情况下客户端发送心跳时,服务端没有收到客户端服务注册请求,需要客户端发起注册请求。

心跳间隔到底是多少?

上一章提到过,默认情况下客户端每5s发起一次心跳请求。

从服务端看,心跳间隔和心跳超时阈值一样,可以通过配置Instance元数据控制,key=preserved.heart.beat.interval,服务端的默认时长也是5s。

// Instance
public long getInstanceHeartBeatInterval() {
    return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
            Constants.DEFAULT_HEART_BEAT_INTERVAL);
}

如何处理心跳?

客户端心跳交由Service处理,传入RsInfo包含客户端ip、port、cluster等关键信息。

// Service
public void processClientBeat(final RsInfo rsInfo) {
    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
    clientBeatProcessor.setService(this);
    clientBeatProcessor.setRsInfo(rsInfo);
    HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

提交一个ClientBeatProcessor任务异步立即执行,更新心跳对应Instance的健康状况和上次心跳时间。如果健康状况变更,udp通知有监听的客户端。

// ClientBeatProcessor
public void run() {
  Service service = this.service;
  String ip = rsInfo.getIp();
  String clusterName = rsInfo.getCluster();
  int port = rsInfo.getPort();
  Cluster cluster = service.getClusterMap().get(clusterName);
  // 获取所有临时节点
  List<Instance> instances = cluster.allIPs(true);
  for (Instance instance : instances) {
    // 找到心跳对应实例
    if (instance.getIp().equals(ip) && instance.getPort() == port) {
      // 更新上次心跳时间
      instance.setLastBeat(System.currentTimeMillis());
      if (!instance.isMarked()) {
        // 更新健康状况
        if (!instance.isHealthy()) {
          instance.setHealthy(true);
          // udp推送客户端
          getPushService().serviceChanged(service);
        }
      }
    }
  }
}

注意到这里并没有用ConsistencyService更新底层存储的Instance(注意DataStore没有更新)。

八、集群数据同步

无论是服务注册POST /v1/ns/instance还是服务注销DELETE /v1/ns/instance,责任节点都会异步将变更注册信息同步至其他非责任节点。

但是在心跳的处理上,实例健康状况变更,集群间数据并没有同步(30s心跳超时摘除Instance也走了DELETE /v1/ns/instance,这个情况除外)。甚至责任节点的ServiceManager与DataStore中Instance的健康状态都不一致。这是为什么?

对于集群数据不一致。15s超时表示短暂网络抖动,不认为服务实例真正下线了,暂时不需要同步给其他节点,减少频繁数据同步。只有当30s超时,认为实例真的下线了,从Service里真实摘除后,通过DELETE /v1/ns/instance真实执行服务注销流程(更新ServiceManager/DataStore/集群同步)。

对于节点内部数据不一致。所有节点对外提供的查询服务接口,都是走的ServiceManager,Service里的Instance是会设置健康状态为false的,对客户端来说是正确的。DataStore属于Nacos内部逻辑,集群数据同步用的。

另外,为了保证所有节点的数据一致,实际上在DistroProtocol构造时,会提交一个定时任务DistroVerifyTask,责任节点每5s向其他节点同步全量同步自己负责的服务信息。这个DistroData的类型是VERIFY,数据只包含服务包含的Instance列表的摘要信息(MD5)。

public class DistroVerifyTask implements Runnable {
    private final ServerMemberManager serverMemberManager;
    private final DistroComponentHolder distroComponentHolder;

    @Override
    public void run() {
        List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
        for (String each : distroComponentHolder.getDataStorageTypes()) {
          verifyForDataStorage(each, targetServer);
        }
    }

    private void verifyForDataStorage(String type, List<Member> targetServer) {
        DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
        distroData.setType(DataOperation.VERIFY);
        for (Member member : targetServer) {
          distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
        }
    }
}

其他非责任节点通过PUT /v1/ns/distro/checksum接收VERIFY Distro数据。

非责任节点DistroConsistencyServiceImpl#onReceiveChecksums结合当前节点DataStore中的数据,比对出需要更新和需要删除的服务。

对需要删除的服务,从DataSore和ServiceManager中删除。

对需要更新的服务,需要调用GET /v1/ns/distro/datum反查查询责任节点获取服务对应注册表信息(从DataStore中查询),更新DataStore和ServiceManager中的注册信息。

// DistroConsistencyServiceImpl
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
  // 根据责任节点发来的数据,结合自己DataStore里的数据分组
  // 待更新的service
  List<String> toUpdateKeys = new ArrayList<>();
  // 待删除的service
  List<String> toRemoveKeys = new ArrayList<>();

  // 删除服务 dataStore&ServiceManager
  for (String key : toRemoveKeys) {
    onRemove(key);
  }

  // 更新服务,二次请求GET /v1/ns/distro/datum获取,更新dataStore&ServiceManager内存注册表
  DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);
  distroKey.getActualResourceTypes().addAll(toUpdateKeys);
  DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
  if (null != remoteData) {
    processData(remoteData.getContent());
  }
}

总结

  • 服务端模型主要包括:ServiceManager管理namespace+group+service到Service实例的映射关系;Service管理服务与Cluster集群的映射关系;Cluster管理其下面的持久/临时Instance列表。

Nacos源码(八)1.4.1注册中心服务端
命名服务实现模型.png
  • Nacos集群管理:Nacos集群通过MemberLoopup初始化,一般可以用nacos.home/cluster/cluster.conf配置文件的方式初始化。每个节点每2s执行POST /v1/core/cluster/report,向集群随机节点(包含DOWN)发送当前节点信息,一方面是为了同步当前节点信息,另一方面也是健康检查。

    健康检查是双向的,每个节点都会主动发起健康检查,也会被动接收健康检查。如果健康检查失败,对端节点标记为SUSPICIOUS,表示对端可能下线,但是可以参与Distro协议负责处理写请求;当连续超过3次健康检查失败,会标记为对端节点为DOWN。

  • Distro写请求:对于客户端写请求,如服务注册、客户端心跳,DistroFilter会拦截(基于@CanDistro注解)。判断请求参数中的groupServiceName是否属于当前节点管理范围(通过hash取模),如果不属于当前节点管理,转发其他节点处理,用其他节点的返回信息返回客户端;如果属于当前节点管理,直接进入Controller处理。

  • 服务订阅/查询:根据Distro协议,集群每个节点都存储所有数据,每个节点都可以处理读请求,返回当前节点注册表里的数据,无论数据是否是最新的。服务查询是根据客户端提供的namespace、groupServiceName定位到服务端ServiceManager管理的Service。另外1、如果udpPort大于0,执行服务订阅,会注册客户端UDP监听信息到内存Map,待注册表变更后通过UDP通知客户端;2、当某个服务下的实例大量(通过service.protectThreshold控制,默认0)下线时,会开启保护模式。服务端认为自己发生了网络分区,将所有实例认为是健康状态返回给客户端。

  • 服务注册:会经过DistroFilter,只能由责任节点处理。主要做了三件事情:1、更新当前节点的注册表(内存Map);2、将更新后的Service同步至其他节点(Distro协议规定每个节点都能执行查询请求,每个节点都有全量数据);3、UDP推送监听该服务的客户端。

  • 客户端心跳:会经过DistroFilter。客户端每5s向服务端发起心跳请求PUT /nacos/v1/ns/instance/beat,服务端会更新内存中Instance的上次心跳时间和健康状况;服务端每个Service,每5s执行一次心跳超时检测任务,如果Instance超过15秒没有发送过心跳,设置Instance非健康,如果Instance超过30秒没有发送过心跳,直接摘除Instance。心跳相关时间配置,可以通过控制台或服务注册时设置在Instance的metadata中。

    含义 Instance元数据Key 默认值
    客户端发送心跳间隔(毫秒) preserved.heart.beat.interval 5000
    心跳超时时间(标记实例非健康)(毫秒) preserved.heart.beat.timeout 15000
    心跳超时时间(摘除实例)(毫秒) preserved.ip.delete.timeout 30000
  • 集群数据同步:当发生服务注册或服务注销(包含客户端30s心跳超时)时,责任节点会将服务数据同步至其他非责任节点。当服务端检测到客户端心跳15s超时(不满30s),只会在当前责任节点标记实例为非健康状态,不会将非健康状态同步至其他节点;当服务端重新接收到客户端心跳后(15-30s中间),重新标记实例为健康,也不会做数据同步。责任节点每5秒(默认nacos.core.protocol.distro.data.verify_interval_ms=5000ms),同步所有自己负责Service的Instance列表的MD5到其他节点,其他节点如果发现MD5发生变更,会反查责任节点,然后更新本地数据。

以上是关于Nacos源码1.4.1注册中心服务端的主要内容,如果未能解决你的问题,请参考以下文章

Nacos源码1.4.1配置中心服务端

nacos源码解析-注册中心服务注册处理

Nacos源码1.4.1配置中心客户端

微服务架构 | *3.5 Nacos 服务注册与发现的源码分析

Nacos源码之服务端AP架构集群节点数据的同步

SpringCloud Nacos 注册中心 -- 认识和安装Nacos