Nacos中服务注册中心APCP模式实现,APCP模式切换

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos中服务注册中心APCP模式实现,APCP模式切换相关的知识,希望对你有一定的参考价值。

本文分析Nacos基于Nacos 2.0

Nacos中服务注册中心默认是AP模式,如果设置为CP模式
那么客户端设置 spring.cloud.nacos.discovery.ephemeral=false (默认为true) ,表示是启用AP模式

接下来我们看看Nacos中对于AP、CP模式是怎么实现的。

首先说明一下,在Nacos中默认基于HTTP的端口号是8848 ,Nacos2.0增加了gRPC,而gRPC有两个地方,一个是和客户端通信,一个是集群节点之间的通信。 Nacos中gRPC的端口号都是基于HTTP端口进行一定的漂移,客户端通信端口是漂移1000,即默认为:8848+1000=9848,而集群之间的通信端口漂移 1001,即8848+·1001=9849,这里需要注意如果网络安全需要开启相关端口的话,那么这里需要开通,8848,9848,9849,三个端口号

AP模式

在Nacos服务注册,即naming服务中,对于AP模式,是采用gRPC通信的,而协议则是自己实现了一个名为Distro协议。我们看下客户端是怎么进行服务注册的,客户端进行服务注册主要是通过NacosNamingService来实现的:

    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
            throws NacosException 
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setWeight(1.0);
        instance.setClusterName(clusterName);
        registerInstance(serviceName, groupName, instance);
    
    public void registerInstance(String serviceName, Instance instance) throws NacosException 
        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
    
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException 
        NamingUtils.checkInstanceIsLegal(instance);
        clientProxy.registerService(serviceName, groupName, instance);
    

而这里最终会通过clientProxy去进行服务注册,实现为NamingClientProxyDelegate:

 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException 
        getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
    
private NamingClientProxy getExecuteClientProxy(Instance instance) 
        return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
    

可以看到在NamingClientProxyDelegate中会判断注册的服务的实例是否是临时的,如果是临时的则用gRPCClient否则httpClient请求
我们知道,默认instance.isEphemeral=true即是临时的,应采用gRPCClient去进行服务注册。
这里gRPCClient实现为NamingGrpcClientProxy:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException 
        NAMING_LOGGER.info("[REGISTER-SERVICE]  registering service  with instance ", namespaceId, serviceName,
                instance);
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
        namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);
    

最后通过gRPC向服务端发送了一个InstanceRequest请求。
到这里,客户端就完成了服务注册请求的发送,接下来看看服务端怎么处理的。
服务端gRPC实现都是继承BaseGrpcServer,其子类主要是不同线程池的选择,其中GrpcSdkServer用来处理和客户端之间的通信,GrpcClusterServer用来集群节点之间的通信
而相关请求在GrpcRequestAcceptor根据不同请求类型获取RequestHandlerRegistry对应的RequestHandler进行处理。

这里服务注册客户端发送的请求类型为InstanceRequest,请求将交由InstanceRequestHandler处理:

public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException 
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) 
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        
    
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) 
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    

可以看到,InstanceRequestHandler将会处理服务注册和服务下线两个处理,而这里实际处理会交给AP模式的实现EphemeralClientOperationServiceImpl:

public void registerInstance(Service service, Instance instance, String clientId) 
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        Client client = clientManager.getClient(clientId);
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    

这里可以看到,在AP模式下,首先就会将注册的实例信息通过clientManager获取到对应的Client信息,直接写入到Client中,而这里写入的则是一个InstancePublishInfo的信息,后续会通过ClientManager能够获取到各个客户端节点发布的服务信息

这里比较需要注意的是client.addServiceInstance(singleton, instanceInfo);,在Nacos中AP是将服务信息放入到了client和ServiceManager中。而client的addServiceInstance实现如下:

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) 
        if (null == publishers.put(service, instancePublishInfo)) 
            MetricsMonitor.incrementInstanceCount();
        
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service , ", service, getClientId());
        return true;
    

这里会发布ClientEvent.ClientChangedEvent事件,然后会异步的将注册的服务信息同步和集群其他节点,而这个事件的处理在DistroClientDataProcessor:

public void onEvent(Event event) 
        if (EnvUtil.getStandaloneMode()) 
            return;
        
        if (!upgradeJudgement.isUseGrpcFeatures()) 
            return;
        
        if (event instanceof ClientEvent.ClientVerifyFailedEvent) 
            syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
         else 
            syncToAllServer((ClientEvent) event);
        
    

最终会走syncToAllServer的逻辑:

// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event) 
        Client client = event.getClient();
        if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) 
            return;
        
        if (event instanceof ClientEvent.ClientDisconnectEvent) 
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.DELETE);
         else if (event instanceof ClientEvent.ClientChangedEvent) 
            DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
            distroProtocol.sync(distroKey, DataOperation.CHANGE);
        
    
// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay) 
        for (Member each : memberManager.allMembersWithoutSelf()) 
            syncToTarget(distroKey, action, each.getAddress(), delay);
        
    

具体实现则是在DistroProtocol中,则是一个Distro协议的实现,这里最后是封装成一个DistroSyncChangeTask,实现了Runnable接口:

public void run() 
        String type = getDistroKey().getResourceType();
        DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
        if (null == transportAgent) 
            Loggers.DISTRO.warn("No found transport agent for type []", type);
            return;
        
        Loggers.DISTRO.info("[DISTRO-START] ", toString());
        if (transportAgent.supportCallbackTransport()) 
            doExecuteWithCallback(new DistroExecuteCallback());
         else 
            executeDistroTask();
        
    
    
    private void executeDistroTask() 
        try 
            boolean result = doExecute();
            if (!result) 
                handleFailedTask();
            
            Loggers.DISTRO.info("[DISTRO-END]  result: ", toString(), result);
         catch (Exception e) 
            handleFailedTask();
        
    

我们看看doExecute:

protected boolean doExecute() 
        String type = getDistroKey().getResourceType();
        DistroData distroData = getDistroData(type);
        if (null == distroData) 
            Loggers.DISTRO.warn("[DISTRO]  with null data to sync, skip", toString());
            return true;
        
        return getDistroComponentHolder().findTransportAgent(type)
                .syncData(distroData, getDistroKey().getTargetServer());
    

这里会获取需要同步的数据,通过getDistroData,而最终会调用DistroClientDataProcessor.getDistroData:

public DistroData getDistroData(DistroKey distroKey) 
        Client client = clientManager.getClient(distroKey.getResourceKey());
        if (null == client) 
            return null;
        
        byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
        return new DistroData(distroKey, data);
    

通过clientManager获取到了该客户端节点注册的实例的信息。
另外一个就是在NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));发布该事件的时候,会调用ClientServiceIndexesManager.onEvent方法,添加到ClientServiceIndexesManager.publisherIndexes 这是一个Map<Service, Set<String>>结构,记录了一个服务的所有提供方,然后会发布ServiceEvent.ServiceChangedEvent事件,这个事件最后会在NamingSubscriberServiceV2Impl.onEvent中处理,会通过push方式(gRPC)将新上线的服务信息推送给消费方

另外在服务注册的时候发布ClientOperationEvent.ClientRegisterServiceEvent的时候也会在ClientServiceIndexesManager中处理该事件:

private void addPublisherIndexes(Service service, String clientId) 
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    

可以看到,处理很简单就是将发布的服务信息和对应的客户端节点放入到了一个ConcurrentMap<Service, Set<String>>中,这样
后面需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息

CP模式

对于CP模式来说,Nacos2使用了Raft协议,在Nacos2中则是使用阿里开源的SOFA-Jraft来实现Raft协议,想了解的可以看这篇 Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

客户端则是通过NamingHttpClientProxy模式发送相关请求。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException 
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) 
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, groupedServiceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); 
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    

在服务端则是InstanceController进行处理:

@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception 
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        final Instance instance = parseInstance(request);
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    

最终在PersistentServiceProcessor.put中进行持久化:

public void put(String key, Record value) throws NacosException 
        final BatchWriteRequest req = new BatchWriteRequest();
        Datum datum = Datum.createDatum(key, value);
        req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
        final 以上是关于Nacos中服务注册中心APCP模式实现,APCP模式切换的主要内容,如果未能解决你的问题,请参考以下文章

使用Nacos作为分布式注册中心,实现服务注册与发现

Springcloud 整合Alibaba的Nacos实现服务注册和配置中心

nacos源码解析-注册中心服务注册处理

配置 Nacos 注册中心

Spring Cloud Alibaba全家桶——微服务组件Nacos注册中心

rnacos——用rust重新实现的nacos开源配置注册中心服务