redis是cp还是ap
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis是cp还是ap相关的知识,希望对你有一定的参考价值。
AP 大多数网站架构的选择 CP Redis、Mongodb 注意:分布式架构的时候必须做出取舍。 一致性和可用性之间取一个平衡。多余大多数web应用,其实并不需要强一致性 参考技术A 吗正确的是第1个,你指的第1个就可以,第2个是错误的。 参考技术B 这个的话一般都是cp的,不可能是ap的,知道吗 参考技术C 原来是cp还是AP,这个你可以去查查或者找找专业的一些情况吧。Nacos 是如何同时实现AP与CP的
Nacos 是如何同时实现AP与CP的
两种一致性策略如何在nacos中共存
或许会有疑问,为什么早先的cp
模式的Zookeeper
或者AP
模式的Eureka
,都只有支持CAP
理论下大家常用的AP
实现或者CP
实现,而nacos却能够两个都实现呢?
其实CAP
理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP
模式的Eureka
就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP
模式的Zookeeper
,而追其根本,并不是说Eureka
是AP
的,或者说Zookeeper
是CP
的,而是他们存储的数据的一致性,满足AP
或者CP
,因此也就不难实现在一个组件中实现AP
模式与CP
模式共存
1 2 3 4 5 6 7 8 9 | @Service("consistencyDelegate") public class DelegateConsistencyServiceImpl implements ConsistencyService @Autowired private PersistentConsistencyService persistentConsistencyService; @Autowired private EphemeralConsistencyService ephemeralConsistencyService; |
DelegateConsistencyServiceImpl
是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CP
与AP
切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService
策略或者EphemeralConsistencyService
策略,而EphemeralConsistencyService
对应的是DistroConsistencyServiceImpl
,采用的协议是阿里自研的Distro
,我个人觉得就像gossip
协议;PersistentConsistencyService
对应的是RaftConsistencyServiceImpl
,其底层采用的是Raft
协议;这两种一致性策略下的数据存储互不影响,所以nacos
实现了AP
模式与CP
模式在一个组件中同时存在
AP实现
Nacos中的DistroConsistencyServiceImpl工作浅析
Eureka 一致性策略
Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存
DiscoveryClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | private void fetchRegistryFromBackup() try @SuppressWarnings("deprecation") BackupRegistry backupRegistryInstance = newBackupRegistryInstance(); if (null == backupRegistryInstance) // backward compatibility with the old protected method, in case it is being used. backupRegistryInstance = backupRegistryProvider.get(); if (null != backupRegistryInstance) Applications apps = null; if (isFetchingRemoteRegionRegistries()) String remoteRegionsStr = remoteRegionsToFetch.get(); if (null != remoteRegionsStr) apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(",")); else apps = backupRegistryInstance.fetchRegistry(); if (apps != null) final Applications applications = this.filterAndShuffle(apps); applications.setAppsHashCode(applications.getReconcileHashCode()); localRegionApps.set(applications); logTotalInstances(); logger.info("Fetched registry successfully from the backup"); else logger.warn("No backup registry instance defined & unable to find any discovery servers."); catch (Throwable e) logger.warn("Cannot fetch applications from apps although backup registry was specified", e); |
正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)
Nacos一致性策略——Distro
Nacos在AP模式下的一致性策略就类似于Eureka,采用Server
之间互相的数据同步来实现数据在集群中的同步、复制操作。
触发数据广播
1 2 3 4 5 6 7 | DistroConsistencyServiceImpl.java @Override public void put(String key, Record value) throws NacosException onPut(key, value); taskDispatcher.addTask(key); |
当调用ConsistencyService
接口定义的put
、remove
方法时,涉及到了Server
端数据的变更,此时会创建一个任务,将数据的key
传入taskDispatcher.addTask
方法中,用于后面数据变更时数据查找操作
1 2 3 4 5 | TaskDispatcher.java public void addTask(String key) taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key); |
这里有一个方法需要注意——shakeUp
,查看官方代码注解可知这是将key
(key
可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler
对象,确保每个TaskScheduler
所承担的任务都差不多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | public class TaskScheduler implements Runnable private int dataSize = 0; private long lastDispatchTime = 0L; private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024); ... public void addTask(String key) queue.offer(key); @Override public void run() List<String> keys = new ArrayList<>(); while (true) try String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) Loggers.EPHEMERAL.debug("got key: ", key); if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) continue; if (StringUtils.isBlank(key)) continue; if (dataSize == 0) keys = new ArrayList<>(); keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) for (Server member : dataSyncer.getServers()) // 自己不需要进行数据广播操作 if (NetUtils.localServer().equals(member.getKey())) continue; SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) Loggers.EPHEMERAL.debug("add sync task: ", JSON.toJSONString(syncTask)); dataSyncer.submit(syncTask, 0); lastDispatchTime = System.currentTimeMillis(); dataSize = 0; catch (Exception e) Loggers.EPHEMERAL.error("dispatch sync task failed.", e); |
核心方法就是for (Server member : dataSyncer.getServers()) ..
循环体内的代码,此处就是将数据在Nacos Server
中进行广播操作;具体步骤如下
- 创建`SyncTask`,并设置事件集合(就是`key`集合)
- 将目标`Server`信息设置到`SyncTask`中——`syncTask.setTargetServer(member.getKey())`
- 将数据广播任务提交到`DataSyncer`中
执行数据广播——DataSyncer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | public void submit(SyncTask task, long delay) // If it's a new task: if (task.getRetryCount() == 0) Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) // associated key already exist: if (Loggers.EPHEMERAL.isDebugEnabled()) Loggers.EPHEMERAL.debug("sync already in process, key: ", key); iterator.remove(); if (task.getKeys().isEmpty()) // all keys are removed: return; GlobalExecutor.submitDataSync(new Runnable() @Override public void run() try if (servers == null || servers.isEmpty()) Loggers.SRV_LOG.warn("try to sync data but server list is empty."); return; List<String> keys = task.getKeys(); if (Loggers.EPHEMERAL.isDebugEnabled()) Loggers.EPHEMERAL.debug("sync keys: ", keys); Map<String, Datum> datumMap = dataStore.batchGet(keys); if (datumMap == null || datumMap.isEmpty()) // clear all flags of this task: for (String key : task.getKeys()) taskMap.remove(buildKey(key, task.getTargetServer())); return; byte[] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); boolean success = NamingProxy.syncData(data, task.getTargetServer()); if (!success) SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); else // clear all flags of this task: for (String key : task.getKeys()) taskMap.remove(buildKey(key, task.getTargetServer())); catch (Exception e) Loggers.EPHEMERAL.error("sync data failed.", e); , delay); |
GlobalExecutor.submitDataSync(Runnable runnable)
提交一个数据广播任务;首先通过SyncTask
中的key
集合去DataStore
中去查询key
所对应的数据集合,然后对数据进行序列化操作,转为byte[]
数组后,执行Http
请求操作——NamingProxy.syncData(data, task.getTargetServer())
;如果数据广播失败,则将任务重新打包再次压入GlobalExecutor
中
(这里有一个疑问,SyncTask记录了任务重试的次数,但是却没有根据该次数做一些判断,比如超过多少次server未响应可能是server挂掉了,这里仅仅是记录了重试的次数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public static boolean syncData(byte[] data, String curServer) throws Exception try Map<String, String> headers = new HashMap<>(128); headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION); headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION); headers.put("Accept-Encoding", "gzip,deflate,sdch"); headers.put("Connection", "Keep-Alive"); headers.put("Content-Encoding", "gzip"); HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data); if (HttpURLConnection.HTTP_OK == result.code) return true; if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) return true; throw new IOException("failed to req API:" + "http://" + curServer + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.code + " msg: " + result.content); catch (Exception e) Loggers.SRV_LOG.warn("NamingProxy", e); return false; |
这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum
中,而该URL对应的处理器为DistroController
中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); if (StringUtils.isBlank(entity)) Loggers.EPHEMERAL.error("[onSync] receive empty entity!"); throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!"); Map<String, Datum<Instances>> dataMap = serializer.deserializeMap(entity.getBytes(), Instances.class); for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) String namespaceId = KeyBuilder.getNamespace(entry.getKey()); String serviceName = KeyBuilder.getServiceName(entry.getKey()); if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) serviceManager.createEmptyService(namespaceId, serviceName, true); consistencyService.onPut(entry.getKey(), entry.getValue().value); return "ok"; |
这里会调用consistencyService.onPut(entry.getKey(), entry.getValue().value)
方法进行数据的更新,注意,onPut
方法并不会涉及taskDispatcher.addTask(key);
操作,而是将数据更新压入了Notifier
的Task
列表中(Notifier
的作用看Nacos Server端注册一个服务实例流程);至此完成了Nacos Server
在AP模式下的数据的最终一致性操作。
CP实现
重要的协议——RAFT
动画演示地址:raft-protocol)
nacos是如何实现CP(raft)的
RaftController
RaftController
控制器是raft
集群内部节点间通信使用的,具体的信息如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | POST HTTP://ip:port/v1/ns/raft/vote : 进行投票请求 POST HTTP://ip:port/v1/ns/raft/beat : Leader向Follower发送心跳信息 GET HTTP://ip:port/v1/ns/raft/peer : 获取该节点的RaftPeer信息 PUT HTTP://ip:port/v1/ns/raft/datum/reload : 重新加载某日志信息 POST HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据并存入 DELETE HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据删除操作 GET HTTP://ip:port/v1/ns/raft/datum : 获取该节点存储的数据信息 GET HTTP://ip:port/v1/ns/raft/state : 获取该节点的状态信息UP or DOWN POST HTTP://ip:port/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作 DELETE HTTP://ip:port/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作 GET HTTP://ip:port/v1/ns/raft/leader : 获取当前集群的Leader节点信息 GET HTTP://ip:port/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者 |
RaftPeerSet
这个对象存储的是所有raft
协议下的节点信息,存储的元素如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // 集群节点地址管理 private ServerListManager serverListManager; // 周期数 private AtomicLong localTerm = new AtomicLong(0L); // 当前周期内的Leader private RaftPeer leader = null; // 所有的节点信息 private Map<String, RaftPeer> peers = new HashMap<String, RaftPeer>(); // 暂时不清楚用途 private Set<String> sites = new HashSet<>(); // 本节点是否已准备完毕 private boolean ready = false; |
同时还具备了raft
协议下必要的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 | // 当前IP对应的节点是否是Leader public boolean isLeader(String ip) if (STANDALONE_MODE) return true; if (leader == null) Loggers.RAFT.warn("[IS LEADER] no leader is available now!"); return false; return StringUtils.equals(leader.ip, ip); // 决定Leader节点,根据投票结果以及是否满足majorityCount机制 public RaftPeer decideLeader(RaftPeer candidate) peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; for (RaftPeer peer : peers.values()) if (StringUtils.isEmpty(peer.voteFor)) continue; // 选票计数 ips.add(peer.voteFor); // 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息 if (ips.getCount(peer.voteFor) > maxApproveCount) maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; // 是否满足majorityCount数量的限制 if (maxApproveCount >= majorityCount()) // 若满足则设置Leader节点信息 RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) leader = peer; Loggers.RAFT.info(" has become the LEADER", leader.ip); return leader; public RaftPeer makeLeader(RaftPeer candidate) // 如果当前Leader与Candidate节点不一样,则进行Leader信息更改 if (!Objects.equals(leader, candidate)) leader = candidate; Loggers.RAFT.info(" has become the LEADER, local: , leader: ", leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader)); for (final RaftPeer peer : peers.values()) Map<String, String> params = new HashMap<String, String>(1); // 如果当前节点与远程Leader节点不等且是Follower节点 if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) try // 获取每个节点的RaftPeer节点信息对象数据 String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER); HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("[NACOS-RAFT] get peer failed: , peer: ", response.getResponseBody(), peer.ip); peer.state = RaftPeer.State.FOLLOWER; return 1; update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); return 0; ); catch (Exception e) peer.state = RaftPeer.State.FOLLOWER; Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: ", peer.ip); return update(candidate); |
RaftCore
该对象是nacos
中raft
协议的主要实现,在启动之初,会进行一系列初始化的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @PostConstruct public void init() throws Exception Loggers.RAFT.info("initializing Raft sub-system"); executor.submit(notifier); long start = System.currentTimeMillis(); // 进行日志文件的加载到内存数据对象Datums的操作 datums = raftStore.loadDatums(notifier); // 设置当前的周期数 setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: , current term: ", datums.size(), peers.getTerm()); while (true) // 等待上一步的数据加载任务全部完成 if (notifier.tasks.size() <= 0) break; Thread.sleep(1000L); // 初始化标识更改 initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: ms.", (System.currentTimeMillis() - start)); // 开启定时的Leader选举任务 GlobalExecutor.registerMasterElection(new MasterElection()); // 开启定时的Leader心跳服务 GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: , heart-beat timeout ms: ", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); |
初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader
还未选举出来,需要在MasterElection
选举Leader
成功后才可以对外提供服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | // Leader 选举任务 public class MasterElection implements Runnable @Override public void run() try // 当前节点是否已准备完毕 if (!peers.isReady()) return; // 获取自身节点信息 RaftPeer local = peers.local(); // 本地存储的Leader任期时间 local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 如果Leader任期时间还在允许范围内,则不进行Leader选举 if (local.leaderDueMs > 0) return; // reset timeout local.resetLeaderDue(); local.resetHeartbeatDue(); // 向其他节点发起投票请求 sendVote(); catch (Exception e) Loggers.RAFT.warn("[RAFT] error while master election ", e); public void sendVote() RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: , term: ", JSON.toJSONString(getLeader()), local.term); // Raft node cluster rest peers.reset(); local.term.incrementAndGet(); // 设置给自己投票 local.voteFor = local.ip; // update node status to CANDIDATE local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<String, String>(1); params.put("vote", JSON.toJSONString(local)); // 遍历所有的节点信息(除了自己之外) for (final String server : peers.allServersWithoutMySelf()) final String url = buildURL(server, API_VOTE); try HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("NACOS-RAFT vote failed: , url: ", response.getResponseBody(), url); return 1; // 获取投票结果,并进行Leader的选举工作 RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: ", JSON.toJSONString(peer)); peers.decideLeader(peer); return 0; ); catch (Exception e) Loggers.RAFT.warn("error while sending vote to server: ", server); |
每个节点启动时,都会认为自己可以作为Leader
,因此都会以自去己作为被选举人,向其他节点发起投票请求,而其他节点在接收到投票请求后的工作流程如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // 其他节点接收到投票请求后的反应 public RaftPeer receivedVote(RaftPeer remote) // 被选举人是否在raft集群节点列表中 if (!peers.contains(remote)) throw new IllegalStateException("can not find peer: " + remote.ip); // 获取自身节点信息 RaftPeer local = peers.get(NetUtils.localServer()); // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者 if (remote.term.get() <= local.term.get()) String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) local.voteFor = local.ip; return local; // 满足投票条件后,本节点确认将自己的票投给被选举者 local.resetLeaderDue(); local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote as leader, term: ", remote.ip, remote.term); return local; |
通过以上步骤,最终选举出了Leader
节点,接下来,就可以对外提供服务了
因为是CP
模式,所以操作都是通过Leader
节点进行传达的,Follower
节点本身不与Client
进行联系,Follower
只能接受来自Leader
的操作请求,因此就存在请求转发的问题。因此在RaftCore
中的singlePublish
以及singleDelete
中,存在着对Leader
节点的判断以及请求转发的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public void signalPublish(String key, Record value) throws Exception if (!isLeader()) JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); // 请求转发 raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; ... public void signalDelete(final String key) throws Exception OPERATE_LOCK.lock(); try if (!isLeader()) Map<String, String> params = new HashMap<>(1); params.put("key", URLEncoder.encode(key, "UTF-8")); // 删除请求进行转发给 leader 进行处理 raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE); return; ... |
同时,还有一个重要的机制——心跳机制,raft
通过心跳机制来维持Leader
以及Follower
的关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | // 心跳任务,如果成为Leader,需要对 follower 发送心跳信息 public class HeartBeat implements Runnable @Override public void run() try // 程序是否已准备完毕 if (!peers.isReady()) return; RaftPeer local = peers.local(); local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; // 心跳周期判断 if (local.heartbeatDueMs > 0) return; // 重置心跳发送周期 local.resetHeartbeatDue(); // 发送心跳信息 sendBeat(); catch (Exception e) Loggers.RAFT.warn("[RAFT] error while sending beat ", e); public void sendBeat() throws IOException, InterruptedException RaftPeer local = peers.local(); // 如果自己不是Leader节点或者处于单机模式下,则直接返回 if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) return; Loggers.RAFT.info("[RAFT] send beat with keys.", datums.size()); // 重置Leader任期时间 local.resetLeaderDue(); // build data JSONObject packet = new JSONObject(); packet.put("peer", local); JSONArray array = new JSONArray(); if (switchDomain.isSendBeatOnly()) Loggers.RAFT.info("[SEND-BEAT-ONLY] ", String.valueOf(switchDomain.isSendBeatOnly())); if (!switchDomain.isSendBeatOnly()) // 如果开启了在心跳包中携带Leader存储的数据进行发送,则对数据进行打包操作 for (Datum datum : datums.values()) JSONObject element = new JSONObject(); if (KeyBuilder.matchServiceMetaKey(datum.key)) element.put("key", KeyBuilder.briefServiceMetaKey(datum.key)); else if (KeyBuilder.matchInstanceListKey(datum.key)) element.put("key", KeyBuilder.briefInstanceListkey(datum.key)); element.put("timestamp", datum.timestamp); array.add(element); else Loggers.RAFT.info("[RAFT] send beat only."); packet.put("datums", array); // broadcast Map<String, String> params = new HashMap<String, String>(1); params.put("beat", JSON.toJSONString(packet)); // 将参数信息进行 Gzip算法压缩,降低网络消耗 String content = JSON.toJSONString(params); ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); gzip.write(content.getBytes("UTF-8")); gzip.close(); byte[] compressedBytes = out.toByteArray(); String compressedContent = new String(compressedBytes, "UTF-8"); Loggers.RAFT.info("raw beat data size: , size of compressed data: ", content.length(), compressedContent.length()); // 遍历所有的Follower节点进行发送心跳数据包 for (final String server : peers.allServersWithoutMySelf()) try final String url = buildURL(server, API_BEAT); Loggers.RAFT.info("send beat to server " + server); // 采用异步HTTP请求进行心跳数据发送 HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() @Override public Integer onCompleted(Response response) throws Exception if (response.getStatusCode() != HttpURLConnection.HTTP_OK) Loggers.RAFT.error("NACOS-RAFT beat failed: , peer: ", response.getResponseBody(), server); MetricsMonitor.getLeaderSendBeatFailedException().increment(); // 成功后接收Follower节点的心跳回复(Follower节点的当前信息)进行节点更新操作 peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); Loggers.RAFT.info("receive beat response from: ", url); return 0; @Override public void onThrowable(Throwable t) Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: ", server, t); MetricsMonitor.getLeaderSendBeatFailedException().increment(); ); catch (Exception e) Loggers.RAFT.error("error while sending heart-beat to peer: ", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); |
至于心跳接收的回复操作基本就是Follower
节点将自己当前的信息进行数据打包发送给Leader
节点,同时也会重置当前Leader
的任期时间信息,并且根据接收到心跳信息,进行拉取Leader
节点的最新数据信息
为什么要同时实现CP和AP两套一致性策略模式?
或许有的人会问,为什么Nacos
要同时实现CP
以及AP
两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP
选什么组件,CP
选什么组件,直接采用nacos
就好了,同时满足你AP
以及CP
的数据一致性需求;直接在一个组件中,享受Zookeeper
以及Eureka
组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。
以上是关于redis是cp还是ap的主要内容,如果未能解决你的问题,请参考以下文章
Nacos一致性协议 CP/AP/JRaft/Distro协议
谈谈注册中心 zookeeper 和 eureka 中的 CP和 AP