nacos1.1.4源码第五天 ap的实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了nacos1.1.4源码第五天 ap的实现相关的知识,希望对你有一定的参考价值。
参考技术A Nacos 集群默认支持的是CAP原则中的AP原则nacos后面的版本可以切换cp和ap
cp ap状态切换命令:curl -X PUT '$NACOS_SERVER:8848/nacos/v1/ns/operator/switches?entry=serverMode&value=CP'
目前1.1.4里面 如果是临时节点就是ap 持久化节点就是cp
前面client注册服务的时候启动的心跳线程 如果是临时节点的话,随机选择一个服务端进行发送, 发送心跳请求
DistroConsistencyServiceImpl
发送请求 nacos/v1/ns/distro/datum
向所有服务轮询查询并存放节点信息Map<String, Datum>dataMap =new ConcurrentHashMap<>(1024);
Distro协议是阿里的私有协议,但是对外开源框架只有Nacos。所有我们只能从Nacos中一窥Distro协议。Distro协议是一个比较简单的最终一致性协议。整体由节点寻址、数据全量同步、异步增量同步、定时上报client所有信息、心跳探活其他节点等组成
Nacos 是如何同时实现AP与CP的
Nacos 是如何同时实现AP与CP的
两种一致性策略如何在nacos中共存
或许会有疑问,为什么早先的cp
模式的Zookeeper
或者AP
模式的Eureka
,都只有支持CAP
理论下大家常用的AP
实现或者CP
实现,而nacos却能够两个都实现呢?
其实CAP
理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP
模式的Eureka
就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP
模式的Zookeeper
,而追其根本,并不是说Eureka
是AP
的,或者说Zookeeper
是CP
的,而是他们存储的数据的一致性,满足AP
或者CP
,因此也就不难实现在一个组件中实现AP
模式与CP
模式共存
1 2 3 4 5 6 7 8 9 | @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService @Autowired private PersistentConsistencyService persistentConsistencyService; @Autowired private EphemeralConsistencyService ephemeralConsistencyService; |
DelegateConsistencyServiceImpl
是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP
与AP
切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService
策略或者EphemeralConsistencyService
策略,而EphemeralConsistencyService
对应的是DistroConsistencyServiceImpl
,采用的协议是阿里自研的Distro
,我个人觉得就像gossip
协议;PersistentConsistencyService
对应的是RaftConsistencyServiceImpl
,其底层采用的是Raft
协议;这两种一致性策略下的数据存储互不影响,所以nacos
实现了AP
模式与CP
模式在一个组件中同时存在
AP实现
Nacos中的DistroConsistencyServiceImpl工作浅析
Eureka 一致性策略
Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存
DiscoveryClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | private void fetchRegistryFromBackup() try @SuppressWarnings("deprecation") BackupRegistry backupRegistryInstance = newBackupRegistryInstance(); if (null == backupRegistryInstance) // backward compatibility with the old protected method, in case it is being used. backupRegistryInstance = backupRegistryProvider.get(); if (null != backupRegistryInstance) Applications apps = null; if (isFetchingRemoteRegionRegistries()) String remoteRegionsStr = remoteRegionsToFetch.get(); if (null != remoteRegionsStr) apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(",")); else apps = backupRegistryInstance.fetchRegistry(); if (apps != null) final Applications applications = this.filterAndShuffle(apps); applications.setAppsHashCode(applications.getReconcileHashCode()); localRegionApps.set(applications); logTotalInstances(); logger.info("Fetched registry successfully from the backup"); else logger.warn("No backup registry instance defined & unable to find any discovery servers."); catch (Throwable e) logger.warn("Cannot fetch applications from apps although backup registry was specified", e); |
正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)
Nacos一致性策略——Distro
Nacos在AP模式下的一致性策略就类似于Eureka,采用Server
之间互相的数据同步来实现数据在集群中的同步、复制操作。
触发数据广播
1 2 3 4 5 6 7 | DistroConsistencyServiceImpl.java @Override public void put(String key, Record value) throws NacosException onPut(key, value); taskDispatcher.addTask(key); |
当调用ConsistencyService
接口定义的put
、remove
方法时,涉及到了Server
端数据的变更,此时会创建一个任务,将数据的key
传入taskDispatcher.addTask
方法中,用于后面数据变更时数据查找操作
1 2 3 4 5 | TaskDispatcher.java public void addTask(String key) taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key); |
这里有一个方法需要注意——shakeUp
,查看官方代码注解可知这是将key
(key
可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler
对象,确保每个TaskScheduler
所承担的任务都差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | public class TaskScheduler implements Runnable private int dataSize = 0; private long lastDispatchTime = 0L; private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024); ... public void addTask(String key) queue.offer(key); @Override public void run() List<String> keys = new ArrayList<>(); while (true) try String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) Loggers.EPHEMERAL.debug("got key: ", key); if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) continue; if (StringUtils.isBlank(key)) continue; if (dataSize == 0) keys = new ArrayList<>(); keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) for (Server member : dataSyncer.getServers()) // 自己不需要进行数据广播操作 if (NetUtils.localServer().equals(member.getKey())) continue; SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) Loggers.EPHEMERAL.debug("add sync task: ", JSON.toJSONString(syncTask)); dataSyncer.submit(syncTask, 0); lastDispatchTime = System.currentTimeMillis(); dataSize = 0; catch (Exception e) Loggers.EPHEMERAL.error("dispatch sync task failed.", e); |
核心方法就是for (Server member : dataSyncer.getServers()) ..
循环体内的代码,此处就是将数据在Nacos Server
中进行广播操作;具体步骤如下
- 创建`SyncTask`,并设置事件集合(就是`key`集合)
- 将目标`Server`信息设置到`SyncTask`中——`syncTask.setTargetServer(member.getKey())`
- 将数据广播任务提交到`DataSyncer`中
执行数据广播——DataSyncer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | public void submit(SyncTask task, long delay) // If it's a new task: if (task.getRetryCount() == 0) Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) // associated key already exist: if (Loggers.EPHEMERAL.isDebugEnabled()) Loggers.EPHEMERAL.debug("sync already in process, key: ", key); iterator.remove(); if (task.getKeys().isEmpty()) // all keys are removed: return; GlobalExecutor.submitDataSync(new Runnable() @Override public void run() try if (servers == null || servers.isEmpty()) Loggers.SRV_LOG.warn("try to sync data but server list is empty."); return; List<String> keys = task.getKeys(); if (Loggers.EPHEMERAL.isDebugEnabled()) Loggers.EPHEMERAL.debug("sync keys: ", keys); Map<String, Datum> datumMap = dataStore.batchGet(keys); if (datumMap == null || datumMap.isEmpty()) // clear all flags of this task: for (String key : task.getKeys()) taskMap.remove(buildKey(key, task.getTargetServer())); return; byte[] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); boolean success = NamingProxy.syncData(data, task.getTargetServer()); if (!success) SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); else // clear all flags of this task: for (String key : task.getKeys()) taskMap.remove(buildKey(key, task.getTargetServer())); catch (Exception e) Loggers.EPHEMERAL.error("sync data failed.", e); , delay); |
GlobalExecutor.submitDataSync(Runnable runnable)
提交一个数据广播任务;首先通过SyncTask
中的key
集合去DataStore
中去查询key
所对应的数据集合,然后对数据进行序列化操作,转为byte[]
数组后,执行Http
请求操作——NamingProxy.syncData(data, task.getTargetServer())
;如果数据广播失败,则将任务重新打包再次压入GlobalExecutor
中
(这里有一个疑问,SyncTask记录了任务重试的次数,但是却没有根据该次数做一些判断,比如超过多少次server未响应可能是server挂掉了,这里仅仅是记录了重试的次数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public static boolean syncData(byte[] data, String curServer) throws Exception try Map<String, String> headers = new HashMap<>(128); headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION); headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION); headers.put("Accept-Encoding", "gzip,deflate,sdch"); headers.put("Connection", "Keep-Alive"); headers.put("Content-Encoding", "gzip"); HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data); if (HttpURLConnection.HTTP_OK == result.code) return true; if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) return true; throw new IOException("failed to req API:" + "http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.code + " msg: " + result.content); catch (Exception e) Loggers.SRV_LOG.warn("NamingProxy", e); return false; |
这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum
中,而该URL对应的处理器为DistroController
中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); if (StringUtils.isBlank(entity)) Loggers.EPHEMERAL.error("[onSync] receive empty entity!"); throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!"); Map<String, Datum<Instances>> dataMap = serializer.deserializeMap(entity.getBytes(), Instances.class); for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) String namespaceId = KeyBuilder.getNamespace(entry.getKey()); String serviceName = KeyBuilder.getServiceName(entry.getKey()); if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) serviceManager.createEmptyService(namespaceId, serviceName, true); consistencyService.onPut(entry.getKey(), entry.getValue().value); return "ok"; |
这里会调用consistencyService.onPut(entry.getKey(), entry.getValue().value)
方法进行数据的更新,注意,onPut
方法并不会涉及taskDispatcher.addTask(key);
操作,而是将数据更新压入了Notifier
的Task
列表中(Notifier
的作用看Nacos Server端注册一个服务实例流程);至此完成了Nacos Server
在AP模式下的数据的最终一致性操作。
CP实现
重要的协议——RAFT
动画演示地址:raft-protocol)
nacos是如何实现CP(raft)的
RaftController
RaftController
控制器是raft
集群内部节点间通信使用的,具体的信息如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | POST HTTP://ip:port/v1/ns/raft/vote : 进行投票请求 POST HTTP://ip:port/v1/ns/raft/beat : Leader向Follower发送心跳信息 GET HTTP://ip:port/v1/ns/raft/peer : 获取该节点的RaftPeer信息 PUT HTTP://ip:port/v1/ns/raft/datum/reload : 重新加载某日志信息 POST HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据并存入 DELETE HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据删除操作 GET HTTP://ip:port/v1/ns/raft/datum : 获取该节点存储的数据信息 GET HTTP://ip:port/v1/ns/raft/state : 获取该节点的状态信息UP or DOWN POST HTTP://ip:port/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作 DELETE HTTP://ip:port/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作 GET HTTP://ip:port/v1/ns/raft/leader : 获取当前集群的Leader节点信息 GET HTTP://ip:port/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者 |
RaftPeerSet
这个对象存储的是所有raft
协议下的节点信息,存储的元素如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // 集群节点地址管理 private ServerListManager serverListManager; // 周期数 private AtomicLong localTerm = new AtomicLong(0L); // 当前周期内的Leader private RaftPeer leader = null; // 所有的节点信息 private Map<String, RaftPeer> peers = new HashMap<String, RaftPeer>(); // 暂时不清楚用途 private Set<String> sites = new HashSet<>(); // 本节点是否已准备完毕 private boolean ready = false; |
同时还具备了raft
协议下必要的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | // 当前IP对应的节点是否是Leader public boolean isLeader(String ip) if (STANDALONE_MODE) return true; if (leader == null) Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; return StringUtils.equals(leader.ip, ip); // 决定Leader节点,根据投票结果以及是否满足majorityCount机制 public RaftPeer decideLeader(RaftPeer candidate) peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) if (StringUtils.isEmpty(peer.voteFor)) continue; // 选票计数 ips.add(peer.voteFor); // 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息 if (ips.getCount(peer.voteFor) > maxApproveCount) maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; // 是否满足majorityCount数量的限制 if (maxApproveCount >= majorityCount()) // 若满足则设置Leader节点信息 RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) leader = peer; Loggers.RAFT.info(" has become the LEADER", leader.ip); return leader; public RaftPeer makeLeader(RaftPeer candidate) // 如果当前Leader与Candidate节点不一样,则进行Leader信息更改 if (!Objects.equals(leader, candidate)) leader = candidate; Loggers.RAFT.info(" has become the LEADER, local: , leader: ", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); for (final RaftPeer peer : peers.values()) Map<String, String> params = new HashMap<String, String>(1); // 如果当前节点与远程Leader节点不等且是Follower节点 if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) try // 获取每个节点的RaftPeer节点信息对象数据 String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("[NACOS-RAFT] get peer failed: , peer: ", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; ); catch (Exception e) peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: ", peer.ip); return update(candidate); |
RaftCore
该对象是nacos
中raft
协议的主要实现,在启动之初,会进行一系列初始化的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @PostConstruct public void init() throws Exception Loggers.RAFT.info("initializing Raft sub-system"); executor.submit(notifier); long start = System.currentTimeMillis(); // 进行日志文件的加载到内存数据对象Datums的操作 datums = raftStore.loadDatums(notifier); // 设置当前的周期数 setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: , current term: ", datums.size(), peers.getTerm()); while (true) // 等待上一步的数据加载任务全部完成 if (notifier.tasks.size() <= 0) break; Thread.sleep(1000L); // 初始化标识更改 initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: ms.", (System.currentTimeMillis() - start)); // 开启定时的Leader选举任务 GlobalExecutor.registerMasterElection(new MasterElection()); // 开启定时的Leader心跳服务 GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: , heart-beat timeout ms: ", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); |
初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader
还未选举出来,需要在MasterElection
选举Leader
成功后才可以对外提供服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | // Leader 选举任务 public class MasterElection implements Runnable @Override public void run() try // 当前节点是否已准备完毕 if (!peers.isReady()) return; // 获取自身节点信息 RaftPeer local = peers.local(); // 本地存储的Leader任期时间 local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 如果Leader任期时间还在允许范围内,则不进行Leader选举 if (local.leaderDueMs > 0) return; // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); // 向其他节点发起投票请求 sendVote(); catch (Exception e) Loggers.RAFT.warn("[RAFT] error while master election ", e); public void sendVote() RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: , term: ", JSON.toJSONString(getLeader()), local.term); // Raft node cluster rest peers.reset(); local.term.incrementAndGet(); // 设置给自己投票 local.voteFor = local.ip; // update node status to CANDIDATE local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<String, String>(1); params.put("vote", JSON.toJSONString(local)); // 遍历所有的节点信息(除了自己之外) for (final String server : peers.allServersWithoutMySelf()) final String url = buildURL(server, API_VOTE); try HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("NACOS-RAFT vote failed: , url: ", response.getResponseBody(), url); return 1; // 获取投票结果,并进行Leader的选举工作 RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: ", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; ); catch (Exception e) Loggers.RAFT.warn("error while sending vote to server: ", server); |
每个节点启动时,都会认为自己可以作为Leader
,因此都会以自去己作为被选举人,向其他节点发起投票请求,而其他节点在接收到投票请求后的工作流程如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // 其他节点接收到投票请求后的反应 public RaftPeer receivedVote(RaftPeer remote) // 被选举人是否在raft集群节点列表中 if (!peers.contains(remote)) throw new IllegalStateException("can not find peer: " + remote.ip); // 获取自身节点信息 RaftPeer local = peers.get(NetUtils.localServer()); // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者 if (remote.term.get() <= local.term.get()) String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) local.voteFor = local.ip; return local; // 满足投票条件后,本节点确认将自己的票投给被选举者 local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote as leader, term: ", remote.ip, remote.term); return local; |
通过以上步骤,最终选举出了Leader
节点,接下来,就可以对外提供服务了
因为是CP
模式,所以操作都是通过Leader
节点进行传达的,Follower
节点本身不与Client
进行联系,Follower
只能接受来自Leader
的操作请求,因此就存在请求转发的问题。因此在RaftCore
中的singlePublish
以及singleDelete
中,存在着对Leader
节点的判断以及请求转发的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public void signalPublish(String key, Record value) throws Exception if (!isLeader()) JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); // 请求转发 raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; ... public void signalDelete(final String key) throws Exception OPERATE_LOCK.lock(); try if (!isLeader()) Map<String, String> params = new HashMap<>(1); params.put("key", URLEncoder.encode(key, "UTF-8")); // 删除请求进行转发给 leader 进行处理 raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE); return; ... |
同时,还有一个重要的机制——心跳机制,raft
通过心跳机制来维持Leader
以及Follower
的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | // 心跳任务,如果成为Leader,需要对 follower 发送心跳信息 public class HeartBeat implements Runnable @Override public void run() try // 程序是否已准备完毕 if (!peers.isReady()) return; RaftPeer local = peers.local(); local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 心跳周期判断 if (local.heartbeatDueMs > 0) return; // 重置心跳发送周期 local.resetHeartbeatDue(); // 发送心跳信息 sendBeat(); catch (Exception e) Loggers.RAFT.warn("[RAFT] error while sending beat ", e); public void sendBeat() throws IOException, InterruptedException RaftPeer local = peers.local(); // 如果自己不是Leader节点或者处于单机模式下,则直接返回 if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) return; Loggers.RAFT.info("[RAFT] send beat with keys.", datums.size()); // 重置Leader任期时间 local.resetLeaderDue(); // build data JSONObject packet = new JSONObject(); packet.put("peer", local); JSONArray array = new JSONArray(); if (switchDomain.isSendBeatOnly()) Loggers.RAFT.info("[SEND-BEAT-ONLY] ", String.valueOf(switchDomain.isSendBeatOnly())); if (!switchDomain.isSendBeatOnly()) // 如果开启了在心跳包中携带Leader存储的数据进行发送,则对数据进行打包操作 for (Datum datum : datums.values()) JSONObject element = new JSONObject(); if (KeyBuilder.matchServiceMetaKey(datum.key)) element.put("key", KeyBuilder.briefServiceMetaKey(datum.key)); else if (KeyBuilder.matchInstanceListKey(datum.key)) element.put("key", KeyBuilder.briefInstanceListkey(datum.key)); element.put("timestamp", datum.timestamp); array.add(element); else Loggers.RAFT.info("[RAFT] send beat only."); packet.put("datums", array); // broadcast Map<String, String> params = new HashMap<String, String>(1); params.put("beat", JSON.toJSONString(packet)); // 将参数信息进行 Gzip算法压缩,降低网络消耗 String content = JSON.toJSONString(params); ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); gzip.write(content.getBytes("UTF-8")); gzip.close(); byte[] compressedBytes = out.toByteArray(); String compressedContent = new String(compressedBytes, "UTF-8"); Loggers.RAFT.info("raw beat data size: , size of compressed data: ", content.length(), compressedContent.length()); // 遍历所有的Follower节点进行发送心跳数据包 for (final String server : peers.allServersWithoutMySelf()) try final String url = buildURL(server, API_BEAT); Loggers.RAFT.info("send beat to server " + server); // 采用异步HTTP请求进行心跳数据发送 HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("NACOS-RAFT beat failed: , peer: ", response.getResponseBody(), server); MetricsMonitor.getLeaderSendBeatFailedException().increment(); // 成功后接收Follower节点的心跳回复(Follower节点的当前信息)进行节点更新操作 peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); Loggers.RAFT.info("receive beat response from: ", url); return 0; @Override public void onThrowable(Throwable t) Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: ", server, t); MetricsMonitor.getLeaderSendBeatFailedException().increment(); ); catch (Exception e) Loggers.RAFT.error("error while sending heart-beat to peer: ", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); |
至于心跳接收的回复操作基本就是Follower
节点将自己当前的信息进行数据打包发送给Leader
节点,同时也会重置当前Leader
的任期时间信息,并且根据接收到心跳信息,进行拉取Leader
节点的最新数据信息
为什么要同时实现CP和AP两套一致性策略模式?
或许有的人会问,为什么Nacos
要同时实现CP
以及AP
两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP
选什么组件,CP
选什么组件,直接采用nacos
就好了,同时满足你AP
以及CP
的数据一致性需求;直接在一个组件中,享受Zookeeper
以及Eureka
组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。
以上是关于nacos1.1.4源码第五天 ap的实现的主要内容,如果未能解决你的问题,请参考以下文章