Nacos中服务注册中心APCP模式实现,APCP模式切换
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos中服务注册中心APCP模式实现,APCP模式切换相关的知识,希望对你有一定的参考价值。
本文分析Nacos基于Nacos 2.0
Nacos中服务注册中心默认是AP模式,如果设置为CP模式
那么客户端设置 spring.cloud.nacos.discovery.ephemeral=false (默认为true) ,表示是启用AP模式
接下来我们看看Nacos中对于AP、CP模式是怎么实现的。
首先说明一下,在Nacos中默认基于HTTP的端口号是8848 ,Nacos2.0增加了gRPC,而gRPC有两个地方,一个是和客户端通信,一个是集群节点之间的通信。 Nacos中gRPC的端口号都是基于HTTP端口进行一定的漂移,客户端通信端口是漂移1000,即默认为:8848+1000=9848,而集群之间的通信端口漂移 1001,即8848+·1001=9849,这里需要注意如果网络安全需要开启相关端口的话,那么这里需要开通,8848,9848,9849,三个端口号
AP模式
在Nacos服务注册,即naming服务中,对于AP模式,是采用gRPC通信的,而协议则是自己实现了一个名为Distro
协议。我们看下客户端是怎么进行服务注册的,客户端进行服务注册主要是通过NacosNamingService
来实现的:
public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)
throws NacosException
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setWeight(1.0);
instance.setClusterName(clusterName);
registerInstance(serviceName, groupName, instance);
public void registerInstance(String serviceName, Instance instance) throws NacosException
registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException
NamingUtils.checkInstanceIsLegal(instance);
clientProxy.registerService(serviceName, groupName, instance);
而这里最终会通过clientProxy
去进行服务注册,实现为NamingClientProxyDelegate
:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
private NamingClientProxy getExecuteClientProxy(Instance instance)
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
可以看到在NamingClientProxyDelegate
中会判断注册的服务的实例是否是临时的,如果是临时的则用gRPCClient否则httpClient请求
我们知道,默认instance.isEphemeral=true
即是临时的,应采用gRPCClient去进行服务注册。
这里gRPCClient实现为NamingGrpcClientProxy
:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException
NAMING_LOGGER.info("[REGISTER-SERVICE] registering service with instance ", namespaceId, serviceName,
instance);
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
requestToServer(request, Response.class);
namingGrpcConnectionEventListener.cacheInstanceForRedo(serviceName, groupName, instance);
最后通过gRPC向服务端发送了一个InstanceRequest
请求。
到这里,客户端就完成了服务注册请求的发送,接下来看看服务端怎么处理的。
服务端gRPC实现都是继承BaseGrpcServer,其子类主要是不同线程池的选择,其中GrpcSdkServer用来处理和客户端之间的通信,GrpcClusterServer用来集群节点之间的通信
。
而相关请求在GrpcRequestAcceptor
根据不同请求类型获取RequestHandlerRegistry
对应的RequestHandler
进行处理。
这里服务注册客户端发送的请求类型为InstanceRequest
,请求将交由InstanceRequestHandler
处理:
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
switch (request.getType())
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
可以看到,InstanceRequestHandler
将会处理服务注册和服务下线两个处理,而这里实际处理会交给AP模式的实现EphemeralClientOperationServiceImpl
:
public void registerInstance(Service service, Instance instance, String clientId)
Service singleton = ServiceManager.getInstance().getSingleton(service);
Client client = clientManager.getClient(clientId);
InstancePublishInfo instanceInfo = getPublishInfo(instance);
client.addServiceInstance(singleton, instanceInfo);
client.setLastUpdatedTime();
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
这里可以看到,在AP模式下,首先就会将注册的实例信息通过
clientManager获取到对应的Client信息,直接写入到Client中,而这里写入的则是一个
InstancePublishInfo的信息,后续会通过ClientManager能够获取到各个客户端节点发布的服务信息
这里比较需要注意的是client.addServiceInstance(singleton, instanceInfo);
,在Nacos中AP是将服务信息放入到了client和ServiceManager中。而client的addServiceInstance
实现如下:
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo)
if (null == publishers.put(service, instancePublishInfo))
MetricsMonitor.incrementInstanceCount();
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service , ", service, getClientId());
return true;
这里会发布ClientEvent.ClientChangedEvent
事件,然后会异步的将注册的服务信息同步和集群其他节点,而这个事件的处理在DistroClientDataProcessor
:
public void onEvent(Event event)
if (EnvUtil.getStandaloneMode())
return;
if (!upgradeJudgement.isUseGrpcFeatures())
return;
if (event instanceof ClientEvent.ClientVerifyFailedEvent)
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
else
syncToAllServer((ClientEvent) event);
最终会走syncToAllServer
的逻辑:
// DistroClientDataProcessor.java
private void syncToAllServer(ClientEvent event)
Client client = event.getClient();
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client))
return;
if (event instanceof ClientEvent.ClientDisconnectEvent)
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
else if (event instanceof ClientEvent.ClientChangedEvent)
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
// DistroProtocol.java
public void sync(DistroKey distroKey, DataOperation action, long delay)
for (Member each : memberManager.allMembersWithoutSelf())
syncToTarget(distroKey, action, each.getAddress(), delay);
具体实现则是在DistroProtocol
中,则是一个Distro
协议的实现,这里最后是封装成一个DistroSyncChangeTask
,实现了Runnable
接口:
public void run()
String type = getDistroKey().getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);
if (null == transportAgent)
Loggers.DISTRO.warn("No found transport agent for type []", type);
return;
Loggers.DISTRO.info("[DISTRO-START] ", toString());
if (transportAgent.supportCallbackTransport())
doExecuteWithCallback(new DistroExecuteCallback());
else
executeDistroTask();
private void executeDistroTask()
try
boolean result = doExecute();
if (!result)
handleFailedTask();
Loggers.DISTRO.info("[DISTRO-END] result: ", toString(), result);
catch (Exception e)
handleFailedTask();
我们看看doExecute
:
protected boolean doExecute()
String type = getDistroKey().getResourceType();
DistroData distroData = getDistroData(type);
if (null == distroData)
Loggers.DISTRO.warn("[DISTRO] with null data to sync, skip", toString());
return true;
return getDistroComponentHolder().findTransportAgent(type)
.syncData(distroData, getDistroKey().getTargetServer());
这里会获取需要同步的数据,通过getDistroData
,而最终会调用DistroClientDataProcessor.getDistroData
:
public DistroData getDistroData(DistroKey distroKey)
Client client = clientManager.getClient(distroKey.getResourceKey());
if (null == client)
return null;
byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(client.generateSyncData());
return new DistroData(distroKey, data);
通过clientManager获取到了该客户端节点注册的实例的信息。
另外一个就是在NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
发布该事件的时候,会调用ClientServiceIndexesManager.onEvent
方法,添加到ClientServiceIndexesManager.publisherIndexes 这是一个Map<Service, Set<String>>
结构,记录了一个服务的所有提供方,然后会发布ServiceEvent.ServiceChangedEvent
事件,这个事件最后会在NamingSubscriberServiceV2Impl.onEvent
中处理,会通过push方式(gRPC)将新上线的服务信息推送给消费方
另外在服务注册的时候发布ClientOperationEvent.ClientRegisterServiceEvent
的时候也会在ClientServiceIndexesManager
中处理该事件:
private void addPublisherIndexes(Service service, String clientId)
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
可以看到,处理很简单就是将发布的服务信息和对应的客户端节点放入到了一个ConcurrentMap<Service, Set<String>>
中,这样
后面需要获取某个服务的所有的实例的时候,通过ClientServiceIndexesManager和Service能够获取到所有的该服务实例的clientId,然后通过ClientManager和clientId集合能够获取到该服务所有实例节点信息
CP模式
对于CP模式来说,Nacos2使用了Raft协议,在Nacos2中则是使用阿里开源的SOFA-Jraft来实现Raft协议,想了解的可以看这篇 Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制
客户端则是通过NamingHttpClientProxy
模式发送相关请求。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral())
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, groupedServiceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
在服务端则是InstanceController
进行处理:
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
最终在PersistentServiceProcessor.put
中进行持久化:
public void put(String key, Record value) throws NacosException
final BatchWriteRequest req = new BatchWriteRequest();
Datum datum = Datum.createDatum(key, value);
req.append(ByteUtils.toBytes(key), serializer.serialize(datum));
final 以上是关于Nacos中服务注册中心APCP模式实现,APCP模式切换的主要内容,如果未能解决你的问题,请参考以下文章
Springcloud 整合Alibaba的Nacos实现服务注册和配置中心