Nacos 2.0原理解析:Distro协议
Posted zyxzcr
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos 2.0原理解析:Distro协议相关的知识,希望对你有一定的参考价值。
为什么Nacos需要一致性协议?
简单来说就是为了保证在集群模式下各个节点之间数据一致性以及数据同步。
Distro协议是什么?
Distro 协议是 Nacos 社区自研的一种 AP 分布式协议,是面向临时实例设计的一种分布式协议,其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。
Distro协议的设计思想
-
Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
-
每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
-
每个节点独立处理读请求,及时从本地发出响应。
源码分析
数据初始化
新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。
首先,在DistroProtocol类的构造方法中启动了一个startDistroTask()任务,其中包括了初始化同步任务 startLoadTask()
private void startDistroTask()
if (EnvUtil.getStandaloneMode())
isInitialized = true;
return;
startVerifyTask();
startLoadTask();
startLoadTask()数据加载任务创建了一个DistroLoadDataTask任务,并传入了一个修改当前节点Distro协议完成状态的回调函数。
private void startLoadTask()
DistroCallback loadCallback = new DistroCallback()
@Override
public void onSuccess()
isInitialized = true;
@Override
public void onFailed(Throwable throwable)
isInitialized = false;
;
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
加载任务load()启动,调用loadAllDataSnapshotFromRemote获取同步数据。
private void load() throws Exception
// 若出自身之外没有其他节点,则休眠1秒,可能其他节点还未启动完毕
while (memberManager.allMembersWithoutSelf().isEmpty())
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
// 若数据类型为空,说明distroComponentHolder的组件注册器还未初始化完毕
while (distroComponentHolder.getDataStorageTypes().isEmpty())
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
// 加载每个类型的数据
for (String each : distroComponentHolder.getDataStorageTypes())
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each))
// 调用加载方法,并标记已处理
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
从其他节点获取同步数据,使用DistroTransportAgent获取数据,使用DistroDataProcessor来处理数据。
private boolean loadAllDataSnapshotFromRemote(String resourceType)
// 获取数据传输对象
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
// 获取数据处理器
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor)
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type , transportAgent: , dataProcessor: ",
resourceType, transportAgent, dataProcessor);
return false;
// 向每个节点请求数据
for (Member each : memberManager.allMembersWithoutSelf())
try
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot from ", resourceType, each.getAddress());
// 获取数据
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 解析数据
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot from result: ", resourceType, each.getAddress(),
result);
// 若解析成功,标记此类型数据已加载完毕
if (result)
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
catch (Exception e)
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot from failed.", resourceType, each.getAddress(), e);
return false;
使用DistroTransportAgent获取数据。
public DistroData getDatumSnapshot(String targetServer)
// 从节点管理器获取目标节点信息
Member member = memberManager.find(targetServer);
// 判断目标服务器是否健康
if (checkTargetServerStatusUnhealthy(member))
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
// 构建请求参数
DistroDataRequest request = new DistroDataRequest();
// 设置请求的操作类型为DataOperation.SNAPSHOT
request.setDataOperation(DataOperation.SNAPSHOT);
try
// 使用Rpc代理对象发送同步rpc请求
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response))
return ((DistroDataResponse) response).getDistroData();
else
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
catch (NacosException e)
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
使用DistroDataProcessor处理数据,调用handlerClientSyncData方法进行处理。
public boolean processSnapshot(DistroData distroData)
// 反序列化获取的DistroData为ClientSyncDatumSnapshot
ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
// 处理结果集,这里将返回远程节点负责的所有client以及client下面的service、instance信息
for (ClientSyncData each : snapshot.getClientSyncDataList())
// 每次处理一个client
handlerClientSyncData(each);
return true;
handlerClientSyncData方法。
private void handlerClientSyncData(ClientSyncData clientSyncData)
Loggers.DISTRO.info("[Client-Add] Received distro client sync data ", clientSyncData.getClientId());
// 因为是同步数据,因此创建IpPortBasedClient,并缓存
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
// 升级此客户端的服务信息
upgradeClient(client, clientSyncData);
关键方法upgradeClient。
private void upgradeClient(Client client, ClientSyncData clientSyncData)
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
// 已同步的服务集合
Set<Service> syncedService = new HashSet<>();
for (int i = 0; i < namespaces.size(); i++)
// 从获取的数据中构建一个Service对象
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 标记此service已被处理
syncedService.add(singleton);
// 获取当前的实例
InstancePublishInfo instancePublishInfo = instances.get(i);
// 判断是否已经包含当前实例
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton)))
// 不包含则添加
client.addServiceInstance(singleton, instancePublishInfo);
// 当前节点发布服务注册事件
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
// 若当前client内部已发布的service不在本次同步的列表内,说明已经过时了,要删掉
for (Service each : client.getAllPublishedService())
if (!syncedService.contains(each))
client.removeServiceInstance(each);
// 发布客户端下线事件
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
至此就完成了数据初始化同步,在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数据。
增量数据同步
数据完成初始化后,节点的数据发生变化后需要讲增量数据同步到其他节点。
DistroClientDataProcessor类继承了SmartSubscriber,遵循Subscriber/Notify模式,当有订阅的事件时会进行回调通知。DistroClientDataProcessor订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。
public List<Class<? extends Event>> subscribeTypes()
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientEvent.ClientChangedEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
result.add(ClientEvent.ClientVerifyFailedEvent.class);
return result;
这里我们重点关注ClientChangedEvent事件,当ClientChangedEvent事件发生时,DefaultPublisher会回调onEvent方法。
public void onEvent(Event event)
if (EnvUtil.getStandaloneMode())
return;
if (!upgradeJudgement.isUseGrpcFeatures())
return;
if (event instanceof ClientEvent.ClientVerifyFailedEvent)
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
else
//增量同步调用方法
syncToAllServer((ClientEvent) event);
syncToAllServer方法调用DistroProtocol类的sync方法进行数据同步。
private void syncToAllServer(ClientEvent event)
Client client = event.getClient();
// Only ephemeral data sync by Distro, persist client should sync by raft.
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client))
return;
if (event instanceof ClientEvent.ClientDisconnectEvent)
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
//节点变更事件,即增量数据的同步方法
else if (event instanceof ClientEvent.ClientChangedEvent)
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
向除本节点外的所有节点进行数据同步,对每个节点执行具体的同步逻辑syncToTarget方法。
public void sync(DistroKey distroKey, DataOperation action, long delay)
for (Member each : memberManager.allMembersWithoutSelf())
syncToTarget(distroKey, action, each.getAddress(), delay);
调用distroTaskEngineHolder发布延迟任务。
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay)
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled())
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] to ", distroKey, targetServer);
调用DistroDelayTaskProcessor的process() 方法进行任务投递。执行变更任务 DistroSyncChangeTask向指定节点发送消息。
public boolean process(NacosTask task)
if (!(task instanceof DistroDelayTask))
return true;
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
switch (distroDelayTask.getAction())
case DELETE:
DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);
return true;
case CHANGE:
case ADD:
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
default:
return false;
使用DistroClientTransportAgent进行实际的数据发送。
public boolean syncData(DistroData data, String targetServer)
if (isNoExistTarget(targetServer))
return true;
DistroDataRequest request = new DistroDataRequest(data, data.getType());
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member))
Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server unhealthy", targetServer);
return false;
try
// 使用Rpc代理对象发送同步rpc请求
Response response = clusterRpcClientProxy.sendRequest(member, request);
return checkResponse(response);
catch (NacosException e)
Loggers.DISTRO.error("[DISTRO-FAILED] Sync dist以上是关于Nacos 2.0原理解析:Distro协议的主要内容,如果未能解决你的问题,请参考以下文章