Nacos 是如何同时实现AP与CP的

Posted 黄小斜

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Nacos 是如何同时实现AP与CP的相关的知识,希望对你有一定的参考价值。

Nacos 是如何同时实现AP与CP的

  1. 两种一致性策略如何在nacos中共存
  2. AP实现
  3. CP实现
    1. 重要的协议——RAFT
    2. nacos是如何实现CP(raft)的
  4. 为什么要同时实现CP和AP两套一致性策略模式?

两种一致性策略如何在nacos中共存

或许会有疑问,为什么早先的cp模式的Zookeeper或者AP模式的Eureka,都只有支持CAP理论下大家常用的AP实现或者CP实现,而nacos却能够两个都实现呢?

其实CAP理论,仅仅是针对分布式下数据的一致性而言,如果你对于数据的一致性要求不高,可忍受最终一致性,那么AP模式的Eureka就可以满足你了,如果说你对数据的一致性要求很高,那么就使用CP模式的Zookeeper,而追其根本,并不是说EurekaAP的,或者说ZookeeperCP的,而是他们存储的数据的一致性,满足AP或者CP,因此也就不难实现在一个组件中实现AP模式与CP模式共存

1
2
3
4
5
6
7
8
9
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService 

    @Autowired
    private PersistentConsistencyService persistentConsistencyService;

    @Autowired
    private EphemeralConsistencyService ephemeralConsistencyService;

DelegateConsistencyServiceImpl是一个一致性策略选择的类,根据不同的策略触发条件(在nacos中,CPAP切换的条件是注册的服务实例是否是临时实例),选择PersistentConsistencyService策略或者EphemeralConsistencyService策略,而EphemeralConsistencyService对应的是DistroConsistencyServiceImpl,采用的协议是阿里自研的Distro,我个人觉得就像gossip协议;PersistentConsistencyService对应的是RaftConsistencyServiceImpl,其底层采用的是Raft协议;这两种一致性策略下的数据存储互不影响,所以nacos实现了AP模式与CP模式在一个组件中同时存在

AP实现

Nacos中的DistroConsistencyServiceImpl工作浅析

Eureka 一致性策略

Eureka是一个AP模式的服务发现框架,在Eureka集群模式下,Eureka采取的是Server之间互相广播各自的数据进行数据复制、更新操作;并且Eureka在客户端与注册中心出现网络故障时,依然能够获取服务注册信息——Eureka实现了客户端对于服务注册信息的缓存

DiscoveryClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void fetchRegistryFromBackup() 
    try 
        @SuppressWarnings("deprecation")
        BackupRegistry backupRegistryInstance = newBackupRegistryInstance();
        if (null == backupRegistryInstance)  // backward compatibility with the old protected method, in case it is being used.
            backupRegistryInstance = backupRegistryProvider.get();
        

        if (null != backupRegistryInstance) 
            Applications apps = null;
            if (isFetchingRemoteRegionRegistries()) 
                String remoteRegionsStr = remoteRegionsToFetch.get();
                if (null != remoteRegionsStr) 
                    apps = backupRegistryInstance.fetchRegistry(remoteRegionsStr.split(","));
                
             else 
                apps = backupRegistryInstance.fetchRegistry();
            
            if (apps != null) 
                final Applications applications = this.filterAndShuffle(apps);
                applications.setAppsHashCode(applications.getReconcileHashCode());
                localRegionApps.set(applications);
                logTotalInstances();
                logger.info("Fetched registry successfully from the backup");
            
         else 
            logger.warn("No backup registry instance defined & unable to find any discovery servers.");
        
     catch (Throwable e) 
        logger.warn("Cannot fetch applications from apps although backup registry was specified", e);
    

正因为Eureka为了能够在Eureka集群无法工作时不影响消费者调用服务提供者而设置的客户端缓存,因此Eureka无法保证服务注册信息的强一致性(CP模式),只能满足数据的最终一致性(AP模式)

Nacos一致性策略——Distro

Nacos在AP模式下的一致性策略就类似于Eureka,采用Server之间互相的数据同步来实现数据在集群中的同步、复制操作。

触发数据广播

1
2
3
4
5
6
7
DistroConsistencyServiceImpl.java

@Override
public void put(String key, Record value) throws NacosException 
    onPut(key, value);
    taskDispatcher.addTask(key);

当调用ConsistencyService接口定义的putremove方法时,涉及到了Server端数据的变更,此时会创建一个任务,将数据的key传入taskDispatcher.addTask方法中,用于后面数据变更时数据查找操作

1
2
3
4
5
TaskDispatcher.java

public void addTask(String key) 
    taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);

这里有一个方法需要注意——shakeUp,查看官方代码注解可知这是将keykey可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler对象,确保每个TaskScheduler所承担的任务都差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TaskScheduler implements Runnable 
    private int dataSize = 0;  
    private long lastDispatchTime = 0L;
    private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
  	...
    public void addTask(String key) 
        queue.offer(key);
    

    @Override
    public void run() 
        List<String> keys = new ArrayList<>();
        while (true) 
            try 
                String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),TimeUnit.MILLISECONDS);
                if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) 
                    Loggers.EPHEMERAL.debug("got key: ", key);
                
                if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) 
                    continue;
                
                if (StringUtils.isBlank(key)) 
                    continue;
                
                if (dataSize == 0) 
                    keys = new ArrayList<>();
                
                keys.add(key);
                dataSize++;
                if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) 
                    for (Server member : dataSyncer.getServers()) 
                        // 自己不需要进行数据广播操作
                        if (NetUtils.localServer().equals(member.getKey())) 
                            continue;
                        
                        SyncTask syncTask = new SyncTask();
                        syncTask.setKeys(keys);
                        syncTask.setTargetServer(member.getKey());
                        if (Loggers.EPHEMERAL.isDebugEnabled() && StringUtils.isNotBlank(key)) 
                            Loggers.EPHEMERAL.debug("add sync task: ", JSON.toJSONString(syncTask));
                        
                        dataSyncer.submit(syncTask, 0);
                    
                    lastDispatchTime = System.currentTimeMillis();
                    dataSize = 0;
                
             catch (Exception e) 
                Loggers.EPHEMERAL.error("dispatch sync task failed.", e);
            
        
    

核心方法就是for (Server member : dataSyncer.getServers()) ..循环体内的代码,此处就是将数据在Nacos Server中进行广播操作;具体步骤如下

- 创建`SyncTask`,并设置事件集合(就是`key`集合)
- 将目标`Server`信息设置到`SyncTask`中——`syncTask.setTargetServer(member.getKey())`
- 将数据广播任务提交到`DataSyncer`中

执行数据广播——DataSyncer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void submit(SyncTask task, long delay) 
    // If it's a new task:
    if (task.getRetryCount() == 0) 
        Iterator<String> iterator = task.getKeys().iterator();
        while (iterator.hasNext()) 
            String key = iterator.next();
            if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) 
                // associated key already exist:
                if (Loggers.EPHEMERAL.isDebugEnabled()) 
                    Loggers.EPHEMERAL.debug("sync already in process, key: ", key);
                
                iterator.remove();
            
        
    
    if (task.getKeys().isEmpty()) 
        // all keys are removed:
        return;
    
    GlobalExecutor.submitDataSync(new Runnable() 
        @Override
        public void run() 
            try 
                if (servers == null || servers.isEmpty()) 
                    Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                     return;
                
                List<String> keys = task.getKeys();
                if (Loggers.EPHEMERAL.isDebugEnabled()) 
                    Loggers.EPHEMERAL.debug("sync keys: ", keys);
                
                Map<String, Datum> datumMap = dataStore.batchGet(keys);
                if (datumMap == null || datumMap.isEmpty()) 
                    // clear all flags of this task:
                    for (String key : task.getKeys()) 
                        taskMap.remove(buildKey(key, task.getTargetServer()));
                    
                    return;
                
                byte[] data = serializer.serialize(datumMap);
                long timestamp = System.currentTimeMillis();
                boolean success = NamingProxy.syncData(data, task.getTargetServer());
                if (!success) 
                    SyncTask syncTask = new SyncTask();
                    syncTask.setKeys(task.getKeys());
                    syncTask.setRetryCount(task.getRetryCount() + 1);
                    syncTask.setLastExecuteTime(timestamp);
                    syncTask.setTargetServer(task.getTargetServer());
                    retrySync(syncTask);
                 else 
                    // clear all flags of this task:
                    for (String key : task.getKeys()) 
                        taskMap.remove(buildKey(key, task.getTargetServer()));
                    
                
             catch (Exception e) 
                Loggers.EPHEMERAL.error("sync data failed.", e);
            
        
    , delay);

GlobalExecutor.submitDataSync(Runnable runnable)提交一个数据广播任务;首先通过SyncTask中的key集合去DataStore中去查询key所对应的数据集合,然后对数据进行序列化操作,转为byte[]数组后,执行Http请求操作——NamingProxy.syncData(data, task.getTargetServer());如果数据广播失败,则将任务重新打包再次压入GlobalExecutor

(这里有一个疑问,SyncTask记录了任务重试的次数,但是却没有根据该次数做一些判断,比如超过多少次server未响应可能是server挂掉了,这里仅仅是记录了重试的次数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static boolean syncData(byte[] data, String curServer) throws Exception 
    try 
        Map<String, String> headers = new HashMap<>(128);
        headers.put("Client-Version", UtilsAndCommons.SERVER_VERSION);
        headers.put("User-Agent", UtilsAndCommons.SERVER_VERSION);
        headers.put("Accept-Encoding", "gzip,deflate,sdch");
        headers.put("Connection", "Keep-Alive");
        headers.put("Content-Encoding", "gzip");

        HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL, headers, data);

        if (HttpURLConnection.HTTP_OK == result.code) 
            return true;
        
        if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) 
            return true;
        
        throw new IOException("failed to req API:" + "http://" + curServer
                + RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:"
                + result.code + " msg: " + result.content);
     catch (Exception e) 
        Loggers.SRV_LOG.warn("NamingProxy", e);
    
    return false;

这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum中,而该URL对应的处理器为DistroController中的public String onSyncDatum(HttpServletRequest request, HttpServletResponse response)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public String onSyncDatum(HttpServletRequest request, HttpServletResponse response) throws Exception 

    String entity = IOUtils.toString(request.getInputStream(), "UTF-8");

    if (StringUtils.isBlank(entity)) 
        Loggers.EPHEMERAL.error("[onSync] receive empty entity!");
        throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");
    

    Map<String, Datum<Instances>> dataMap = serializer.deserializeMap(entity.getBytes(), Instances.class);
    for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) 
        if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) 
            String namespaceId = KeyBuilder.getNamespace(entry.getKey());
            String serviceName = KeyBuilder.getServiceName(entry.getKey());
            if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) 
                serviceManager.createEmptyService(namespaceId, serviceName, true);
            
            consistencyService.onPut(entry.getKey(), entry.getValue().value);
        
    
    return "ok";

这里会调用consistencyService.onPut(entry.getKey(), entry.getValue().value)方法进行数据的更新,注意,onPut方法并不会涉及taskDispatcher.addTask(key);操作,而是将数据更新压入了NotifierTask列表中(Notifier的作用看Nacos Server端注册一个服务实例流程);至此完成了Nacos Server在AP模式下的数据的最终一致性操作。

CP实现

重要的协议——RAFT

动画演示地址:raft-protocol)

nacos是如何实现CP(raft)的

RaftController

RaftController控制器是raft集群内部节点间通信使用的,具体的信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
POST HTTP://ip:port/v1/ns/raft/vote : 进行投票请求

POST HTTP://ip:port/v1/ns/raft/beat : Leader向Follower发送心跳信息

GET HTTP://ip:port/v1/ns/raft/peer : 获取该节点的RaftPeer信息

PUT HTTP://ip:port/v1/ns/raft/datum/reload : 重新加载某日志信息

POST HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据并存入

DELETE HTTP://ip:port/v1/ns/raft/datum : Leader接收传来的数据删除操作

GET HTTP://ip:port/v1/ns/raft/datum : 获取该节点存储的数据信息

GET HTTP://ip:port/v1/ns/raft/state : 获取该节点的状态信息UP or DOWN

POST HTTP://ip:port/v1/ns/raft/datum/commit : Follower节点接收Leader传来得到数据存入操作

DELETE HTTP://ip:port/v1/ns/raft/datum : Follower节点接收Leader传来的数据删除操作

GET HTTP://ip:port/v1/ns/raft/leader : 获取当前集群的Leader节点信息

GET HTTP://ip:port/v1/ns/raft/listeners : 获取当前Raft集群的所有事件监听者

RaftPeerSet

这个对象存储的是所有raft协议下的节点信息,存储的元素如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 集群节点地址管理
private ServerListManager serverListManager;

// 周期数
private AtomicLong localTerm = new AtomicLong(0L);

// 当前周期内的Leader
private RaftPeer leader = null;

// 所有的节点信息
private Map<String, RaftPeer> peers = new HashMap<String, RaftPeer>();

// 暂时不清楚用途
private Set<String> sites = new HashSet<>();

// 本节点是否已准备完毕
private boolean ready = false;

同时还具备了raft协议下必要的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// 当前IP对应的节点是否是Leader
public boolean isLeader(String ip) 
    if (STANDALONE_MODE) 
        return true;
    
    if (leader == null) 
        Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
        return false;
    
    return StringUtils.equals(leader.ip, ip);


// 决定Leader节点,根据投票结果以及是否满足majorityCount机制
public RaftPeer decideLeader(RaftPeer candidate) 
    peers.put(candidate.ip, candidate);
    SortedBag ips = new TreeBag();
    int maxApproveCount = 0;
    String maxApprovePeer = null;
    for (RaftPeer peer : peers.values()) 
        if (StringUtils.isEmpty(peer.voteFor)) 
            continue;
        
      	// 选票计数
        ips.add(peer.voteFor);
      	// 如果某节点的得票数大于当前的最大得票数,则更新候选Leader信息
        if (ips.getCount(peer.voteFor) > maxApproveCount) 
            maxApproveCount = ips.getCount(peer.voteFor);
            maxApprovePeer = peer.voteFor;
        
    
  	// 是否满足majorityCount数量的限制
    if (maxApproveCount >= majorityCount()) 
      	// 若满足则设置Leader节点信息
        RaftPeer peer = peers.get(maxApprovePeer);
        peer.state = RaftPeer.State.LEADER;

        if (!Objects.equals(leader, peer)) 
            leader = peer;
            Loggers.RAFT.info(" has become the LEADER", leader.ip);
        
    
    return leader;


public RaftPeer makeLeader(RaftPeer candidate) 
  	// 如果当前Leader与Candidate节点不一样,则进行Leader信息更改
    if (!Objects.equals(leader, candidate)) 
        leader = candidate;
        Loggers.RAFT.info(" has become the LEADER, local: , leader: ",
        leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));
    

    for (final RaftPeer peer : peers.values()) 
        Map<String, String> params = new HashMap<String, String>(1);
      	// 如果当前节点与远程Leader节点不等且是Follower节点
        if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) 
            try 
              	// 获取每个节点的RaftPeer节点信息对象数据
                String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);
                HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() 
                    @Override
                    public Integer onCompleted(Response response) throws Exception 
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) 
                            Loggers.RAFT.error("[NACOS-RAFT] get peer failed: , peer: ", response.getResponseBody(), peer.ip);
                            peer.state = RaftPeer.State.FOLLOWER;
                            return 1;
                        
                        update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                        return 0;
                    
                );
             catch (Exception e) 
                peer.state = RaftPeer.State.FOLLOWER;
                Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: ", peer.ip);
            
        
    
    return update(candidate);

RaftCore

该对象是nacosraft协议的主要实现,在启动之初,会进行一系列初始化的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@PostConstruct
public void init() throws Exception 
	Loggers.RAFT.info("initializing Raft sub-system");
	executor.submit(notifier);
	long start = System.currentTimeMillis();
  // 进行日志文件的加载到内存数据对象Datums的操作
	datums = raftStore.loadDatums(notifier);
  // 设置当前的周期数
	setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
	Loggers.RAFT.info("cache loaded, datum count: , current term: ", datums.size(), peers.getTerm());
	while (true) 
    // 等待上一步的数据加载任务全部完成
		if (notifier.tasks.size() <= 0) 
			break;
		
		Thread.sleep(1000L);
	
  // 初始化标识更改
	initialized = true;
	Loggers.RAFT.info("finish to load data from disk, cost:  ms.", (System.currentTimeMillis() - start));
  // 开启定时的Leader选举任务
	GlobalExecutor.registerMasterElection(new MasterElection());
  // 开启定时的Leader心跳服务
	GlobalExecutor.registerHeartbeat(new HeartBeat());

	Loggers.RAFT.info("timer started: leader timeout ms: , heart-beat timeout ms: ",
	GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);

初始化的一系列操作完成后,此时集群还无法对外提供服务,因为此时Leader还未选举出来,需要在MasterElection选举Leader成功后才可以对外提供服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Leader 选举任务
public class MasterElection implements Runnable 
	@Override
	public void run() 
		try 
      // 当前节点是否已准备完毕
			if (!peers.isReady()) 
				return;
			
      // 获取自身节点信息
			RaftPeer local = peers.local();
      // 本地存储的Leader任期时间
			local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
      // 如果Leader任期时间还在允许范围内,则不进行Leader选举
			if (local.leaderDueMs > 0) 
				return;
			
			// reset timeout
			local.resetLeaderDue();
			local.resetHeartbeatDue();
      // 向其他节点发起投票请求
			sendVote();
		 catch (Exception e) 
			Loggers.RAFT.warn("[RAFT] error while master election ", e);
		
	
	
  public void sendVote() 
		RaftPeer local = peers.get(NetUtils.localServer());
		Loggers.RAFT.info("leader timeout, start voting,leader: , term: ",
		JSON.toJSONString(getLeader()), local.term);

		// Raft node cluster rest
		peers.reset();
		local.term.incrementAndGet();
		// 设置给自己投票
		local.voteFor = local.ip;
		// update node status to CANDIDATE
		local.state = RaftPeer.State.CANDIDATE;
		Map<String, String> params = new HashMap<String, String>(1);
		params.put("vote", JSON.toJSONString(local));
    // 遍历所有的节点信息(除了自己之外)
		for (final String server : peers.allServersWithoutMySelf()) 
			final String url = buildURL(server, API_VOTE);
			try 
				HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() 
					@Override
					public Integer onCompleted(Response response) throws Exception 
						if (response.getStatusCode() != HttpURLConnection.HTTP_OK) 
							Loggers.RAFT.error("NACOS-RAFT vote failed: , url: ", response.getResponseBody(), url);
							return 1;
						
            // 获取投票结果,并进行Leader的选举工作
						RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
						Loggers.RAFT.info("received approve from peer: ", JSON.toJSONString(peer));
						peers.decideLeader(peer);
						return 0;
					
				);
			 catch (Exception e) 
				Loggers.RAFT.warn("error while sending vote to server: ", server);
			
		
	

每个节点启动时,都会认为自己可以作为Leader,因此都会以自去己作为被选举人,向其他节点发起投票请求,而其他节点在接收到投票请求后的工作流程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 其他节点接收到投票请求后的反应
public RaftPeer receivedVote(RaftPeer remote) 
  // 被选举人是否在raft集群节点列表中
	if (!peers.contains(remote)) 
		throw new IllegalStateException("can not find peer: " + remote.ip);
	

 	// 获取自身节点信息
	RaftPeer local = peers.get(NetUtils.localServer());
  // 如果被选举节点的周期数小于本节点的周期数,则将自己的投票投给自己并告诉被选举者
	if (remote.term.get() <= local.term.get()) 
		String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term;
		Loggers.RAFT.info(msg);
		if (StringUtils.isEmpty(local.voteFor)) 
			local.voteFor = local.ip;
		
		return local;
	
  // 满足投票条件后,本节点确认将自己的票投给被选举者
	local.resetLeaderDue();
	local.state = RaftPeer.State.FOLLOWER;
	local.voteFor = remote.ip;
	local.term.set(remote.term.get());
	Loggers.RAFT.info("vote  as leader, term: ", remote.ip, remote.term);
	return local;

通过以上步骤,最终选举出了Leader节点,接下来,就可以对外提供服务了

因为是CP模式,所以操作都是通过Leader节点进行传达的,Follower节点本身不与Client进行联系,Follower只能接受来自Leader的操作请求,因此就存在请求转发的问题。因此在RaftCore中的singlePublish以及singleDelete中,存在着对Leader节点的判断以及请求转发的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void signalPublish(String key, Record value) throws Exception 
	if (!isLeader()) 
		JSONObject params = new JSONObject();
		params.put("key", key);
		params.put("value", value);
		Map<String, String> parameters = new HashMap<>(1);
		parameters.put("key", key);
		// 请求转发
		raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
		return;
	
  ...


public void signalDelete(final String key) throws Exception 
	OPERATE_LOCK.lock();
	try 
		if (!isLeader()) 
			Map<String, String> params = new HashMap<>(1);
			params.put("key", URLEncoder.encode(key, "UTF-8"));
			// 删除请求进行转发给 leader 进行处理
			raftProxy.proxy(getLeader().ip, API_DEL, params, HttpMethod.DELETE);
			return;
		
    ...
  

同时,还有一个重要的机制——心跳机制,raft通过心跳机制来维持Leader以及Follower的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// 心跳任务,如果成为Leader,需要对 follower 发送心跳信息
public class HeartBeat implements Runnable 
	@Override
	public void run() 
		try 
			// 程序是否已准备完毕
			if (!peers.isReady()) 
				return;
			
			RaftPeer local = peers.local();
			local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
      // 心跳周期判断
			if (local.heartbeatDueMs > 0) 
				return;
			
      // 重置心跳发送周期
			local.resetHeartbeatDue();
      // 发送心跳信息
			sendBeat();
		 catch (Exception e) 
			Loggers.RAFT.warn("[RAFT] error while sending beat ", e);
		
	
  
  public void sendBeat() throws IOException, InterruptedException 
		RaftPeer local = peers.local();
    // 如果自己不是Leader节点或者处于单机模式下,则直接返回
		if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) 
			return;
		
		Loggers.RAFT.info("[RAFT] send beat with  keys.", datums.size());
    // 重置Leader任期时间
		local.resetLeaderDue();
		// build data
		JSONObject packet = new JSONObject();
		packet.put("peer", local);
		JSONArray array = new JSONArray();
		if (switchDomain.isSendBeatOnly()) 
			Loggers.RAFT.info("[SEND-BEAT-ONLY] ", String.valueOf(switchDomain.isSendBeatOnly()));
		
		if (!switchDomain.isSendBeatOnly()) 
      // 如果开启了在心跳包中携带Leader存储的数据进行发送,则对数据进行打包操作
			for (Datum datum : datums.values()) 
				JSONObject element = new JSONObject();
				if (KeyBuilder.matchServiceMetaKey(datum.key)) 
					element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
				 else if (KeyBuilder.matchInstanceListKey(datum.key)) 
					element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
				
				element.put("timestamp", datum.timestamp);
				array.add(element);
			
		 else 
			Loggers.RAFT.info("[RAFT] send beat only.");
		
		packet.put("datums", array);
		// broadcast
		Map<String, String> params = new HashMap<String, String>(1);
		params.put("beat", JSON.toJSONString(packet));
    // 将参数信息进行 Gzip算法压缩,降低网络消耗
		String content = JSON.toJSONString(params);
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		GZIPOutputStream gzip = new GZIPOutputStream(out);
		gzip.write(content.getBytes("UTF-8"));
		gzip.close();

		byte[] compressedBytes = out.toByteArray();
		String compressedContent = new String(compressedBytes, "UTF-8");
		Loggers.RAFT.info("raw beat data size: , size of compressed data: ", content.length(), compressedContent.length());
    // 遍历所有的Follower节点进行发送心跳数据包
		for (final String server : peers.allServersWithoutMySelf()) 
			try 
				final String url = buildURL(server, API_BEAT);
				Loggers.RAFT.info("send beat to server " + server);
        // 采用异步HTTP请求进行心跳数据发送
				HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() 
					@Override
					public Integer onCompleted(Response response) throws Exception 
						if (response.getStatusCode() != HttpURLConnection.HTTP_OK) 
							Loggers.RAFT.error("NACOS-RAFT beat failed: , peer: ", response.getResponseBody(), server);
            	MetricsMonitor.getLeaderSendBeatFailedException().increment();
						
            // 成功后接收Follower节点的心跳回复(Follower节点的当前信息)进行节点更新操作
						peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
						Loggers.RAFT.info("receive beat response from: ", url);
						return 0;
					

					@Override
					public void onThrowable(Throwable t) 
						Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer:  ", server, t);
						MetricsMonitor.getLeaderSendBeatFailedException().increment();
					
				);
			 catch (Exception e) 
				Loggers.RAFT.error("error while sending heart-beat to peer:  ", server, e);
				MetricsMonitor.getLeaderSendBeatFailedException().increment();
			
		
	

至于心跳接收的回复操作基本就是Follower节点将自己当前的信息进行数据打包发送给Leader节点,同时也会重置当前Leader的任期时间信息,并且根据接收到心跳信息,进行拉取Leader节点的最新数据信息

为什么要同时实现CP和AP两套一致性策略模式?

或许有的人会问,为什么Nacos要同时实现CP以及AP两种数据的一致性策略。其实在一个组件中同时实现两种数据一致性策略,我觉得这样在做服务注册中心选型时,就不必操心AP选什么组件,CP选什么组件,直接采用nacos就好了,同时满足你AP以及CP的数据一致性需求;直接在一个组件中,享受Zookeeper以及Eureka组件的服务,避免了需要同时维护两种不同的组件的运维代价,只需要根据自己的实例需求,选择不同的注册模式即可。

以上是关于Nacos 是如何同时实现AP与CP的的主要内容,如果未能解决你的问题,请参考以下文章

Nacos 是如何同时实现AP与CP的

Nacos一致性协议 CP/AP/JRaft/Distro协议

基于raft选举的流程的nacos源码分析

nacos 的 cp 和 ap

nacos1.1.4源码第五天 ap的实现

nacos源码解析-注册中心服务注册处理