Consul 核心原理解析之-Raft leader选举

Posted 云中间件技术

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Consul 核心原理解析之-Raft leader选举相关的知识,希望对你有一定的参考价值。

Consul 是 HashiCorp 公司推出的开源工具,用于实现分布式系统的服务发现与配置。与其他分布式服务注册与发现的方案相比,Consul包括以下开箱即用的“一站式”能力: 

  • 服务注册与发现

  • 丰富的健康检查

  • Key/Value存储

  • 多数据中心方案

  • Service mesh 

  • 分布式锁、leader选举、分布式协调能力

引入consul后系统不再需要依赖其他类似工具(比如ZooKeeper等) Consul官网(www.consul.io)同时给出了consul 和zookeeper、etcd、euraka、istio等众多竞品的对比,可见其自信程度。

TSF 微服务平台中的服务发现和配置中心基于consul构建,并在consul基础上进行了深度开发使其更加符合TSF产品的需求。主要包括支持consul server集群容量水平扩展、集群级容灾、多中心多活、自动快速构建集群、指定leader迁移、多租户隔离等能力。

以上的优化是建立的对consul 原理深入掌握基础上进行的,后续将分多次对consul的原理以及优化进行深入分享。前期先介绍consul 的算法基石-consul raft库的的原理。

Raft算法简介:数据一致性算法,通过日志复制,保证数据在集群中是一致性的。一个 Raft 集群包含若干个服务器节点;通常是 3 个,这允许整个系统容忍 1 个节点的失效。在任何时刻,每一个服务器节点都处于这三个状态之一:leader、candidate或者follower。在通常情况下,系统中只有一个leader并且其他的节点全部都是follower。follower都是被动的:他们不会发送任何请求,只是简单的响应来自leader或者candidate的请求。leader处理所有的客户端请求(如果一个客户端和follower联系,那么follower会把请求重定向给leader)。

本文重点介绍leader 选举部分,后续会依次介绍consul Raft库的剩下三部分,分别是:日志拷贝,集群成员变化和快照处理。

1 节点启动

通过NewRaft可以创建一个Raft节点,该节点初始启动的时候,会进入follower模式,然后根据实际的集群状态,转变角色Candidate和leader。 NewRaft创建节点的时候,会启动三个goroutine来处理不同的业务。

 
   
   
 
  1. r.goFunc(r.run)   //节点运行协程,通过检查state,根据state(Follower,Candidate, Leader)进入不同的模式

  2. r.goFunc(r.runFSM) //状态机的处理协程,后面详细分析

  3. r.goFunc(r.runSnapshots) //快照处理协程

r.run代码

 
   
   
 
  1. / run is a long running goroutine that runs the Raft FSM.

  2. func (r *Raft) run() {

  3.    for {

  4.        ......

  5.        // Enter into a sub-FSM

  6.        switch r.getState() {  //获取当前节点的角色state

  7.        case Follower:

  8.            r.runFollower()  //从runFollower函数退出,进入Candidate状态,然后进入runCandidate函数

  9.        case Candidate:

  10.            r.runCandidate()  

  11.        case Leader:

  12.            r.runLeader() //如果是leader,那么进入leader处理函数,该函数不会退出,直到本节点退出或者本节点不再是leader

  13.        }

  14.    }

  15. }

2 follower处理逻辑

r.runFollower() follower的工作只有两个,接受来自leader或者Candidate的请求,并进行处理;定期检查与leader的心跳时间是否超时,如果超时了则进入Candidate,该函数也就退出了。

 
   
   
 
  1. func (r *Raft) runFollower() {

  2.    ......

  3.    heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)

  4.    for {

  5.        select {

  6.        case rpc := <-r.rpcCh:

  7.            r.processRPC(rpc)  //处理在follower期间收到的来自候选人或者leader的rpc数据包

  8.        case <-heartbeatTimer:

  9.            // Restart the heartbeat timer

  10.            heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)

  11.            ......

  12.            r.setState(Candidate)  //心跳时间超时,长时间没有收到leader的请求,进入候选人模式

  13.            return //退出runFollower -> 进入run循环 -> 根据state进入r.runCandidate()

  14.            ......

  15.        case <-r.shutdownCh:

  16.            return

  17.        }

  18.    }

  19. }

说明: 该库中对于一个节点,刚启动的时候,肯定是先进入follower的。并且需要有一个configuration的配置,才会进入候选者模式的。也就是对于一个节点,需要传递集群节点的配置,才会开始选择,否则会一直处于follower的状态(即使心跳时间超时)。 也就是新建完raft节点后,需要调用函数func (r *Raft) BootstrapCluster(configuration Configuration) Future,该函数会把当前集群所有节点的配置写入到log中,这样在选举的时候,就知道需要多少张票才能成为leader。 

3 Candidate的处理

从runFollower中退出,就会进入到runCandidate的处理逻辑中。

 
   
   
 
  1. func (r *Raft) runCandidate() {

  2.    ......

  3.    voteCh := r.electSelf()  //给所有peers发送投票请求,返回的是一个节点数大小的channel voteCh

  4.    electionTimer := randomTimeout(r.conf.ElectionTimeout)  //选举超时时间,有可能多个Candidate同时选举,得到同样的票数

  5.    ......

  6.    grantedVotes := 0

  7.    votesNeeded := r.quorumSize()  //根据configuration计算成为leader需要的票数


  8.    for r.getState() == Candidate {

  9.        select {

  10.        case rpc := <-r.rpcCh:

  11.            r.processRPC(rpc)

  12.        ......

  13.        case vote := <-voteCh:  //收集response。在electSelf的时候,已经给自己投上一票了,往voteCh添加了一个数据。因此这里肯定会执行一次

  14.            if vote.Term > r.getCurrentTerm() {  //收到一个投票请求的回包,对端的term比自己的大,Candidate变成Follower

  15.                r.logger.Printf("[DEBUG] raft: Newer term discovered, fallback to follower")

  16.                r.setState(Follower)

  17.                r.setCurrentTerm(vote.Term)

  18.                return

  19.            }

  20.            if vote.Granted {  //response中,收到同意回包,票数+1

  21.                grantedVotes++

  22.                r.logger.Printf("[DEBUG] raft: Vote granted from %s in term %v. Tally: %d",

  23.                    vote.voterID, vote.Term, grantedVotes)

  24.            }

  25.            if grantedVotes >= votesNeeded { //如果同意票数超过了大多数,当前节点转换角色为leader,进入leader的主循环

  26.                r.logger.Printf("[INFO] raft: Election won. Tally: %d", grantedVotes)

  27.                r.setState(Leader)

  28.                r.setLeader(r.localAddr)

  29.                return

  30.            }

  31.        ......

  32.        case <-electionTimer:  //如果选举超时了,Candidate成为不了leader,那么退出runCandidate,重新发起一次选举

  33.            // Election failed! Restart the election. We simply return,

  34.            // which will kick us back into runCandidate

  35.            r.logger.Printf("[WARN] raft: Election timeout reached, restarting election")

  36.            return

  37.        .....

  38.        }

  39.    }

  40. }

上面分析可以看出,runCandidate主要做两件事情:

1)接受来自其他节点的rpc请求,然后转到函数r.processRPC(rpc)。对于三种角色的节点,都有这个处理的。

2)发起投票流程,收集投票反馈

 其中发起投票的处理逻辑在函数r.electSelf()中。

 
   
   
 
  1. func (r *Raft) electSelf() <-chan *voteResult {

  2.    respCh := make(chan *voteResult, len(r.configurations.latest.Servers))  //回收投票结果的channel缓冲区

  3.    r.setCurrentTerm(r.getCurrentTerm() + 1) //增加当前的任期,当前节点的term会被持久化到磁盘中

  4.    /**

  5.     * 投票开始时,Candidate将自己的term加1,并在request_vote中带上term;

  6.     * Follower只会接受任期号term比自己大的request_vote请求,并为之投票。

  7.     * 这条规则保证了只有最新的Candidate才有可能成为Leader。

  8.     */

  9.    lastIdx, lastTerm := r.getLastEntry()  //获取当前节点的日志信息

  10.    req := &RequestVoteRequest{  //投票请求包

  11.        RPCHeader:    r.getRPCHeader(),

  12.        Term:         r.getCurrentTerm(),

  13.        Candidate:    r.trans.EncodePeer(r.localID, r.localAddr),

  14.        LastLogIndex: lastIdx,

  15.        LastLogTerm:  lastTerm,

  16.    }

  17.    ......

  18.    for _, server := range r.configurations.latest.Servers { //扫描configuration的节点列表,给有效的节点发起投票请求

  19.        if server.Suffrage == Voter {

  20.        //Voter代表该raft节点参与投票流程,另外还有Nonvoter只接受日志拷贝,但是不参与投票和大多数的计算

  21.            if server.ID == r.localID {  //Candidate给自己投票

  22.                if err := r.persistVote(req.Term, req.Candidate); err != nil {  //需要把选票持久化

  23.                    return nil

  24.                }

  25.                // Include our own vote

  26.                respCh <- &voteResult{ //通过respCh传递投票结果

  27.                    RequestVoteResponse: RequestVoteResponse{

  28.                        RPCHeader: r.getRPCHeader(),

  29.                        Term:      req.Term,

  30.                        Granted:   true,

  31.                    },

  32.                    voterID: r.localID,

  33.                }

  34.            } else {

  35.            /**

  36.             *给其他raft节点发起投票请求,请求收集结果,放到respCh中,由runCandidate统一解释respCh,检查是否是有效的选票

  37.             */

  38.                askPeer(server)


  39.            }

  40.        }

  41.    }


  42.    return respCh

  43. }

Candidate发起投票的时候,有几个注意事项: 1)每个服务器都必须有当前的任期号,从零开始,以后逐渐单向往上递增。服务器重启后需要知道当前的任期号才可以正确的很其它节点交流,所以任期号是必须持久化的。 2)如果给候选节点投票了,要记录下被投票的候选节点ID。如果节点在选举期间给了一个候选人投票后突然宕机重启了,如果没有记下这个值,就很可能会重复投票,又给另一个节点投票去了。这就会导致集群存在多个Leader,也就是集群分裂。 

4 r.processRPC(rpc)处理流程

follower,candidate,leader都有处理收到raft节点的rpc请求的函数,对于leader选举过程中,candidate发起的投票请求RequestVoteRequest,follower需要做出回应。先分析一下r.processRPC(rpc)如何处理选举过程中的收包和回包。

 
   
   
 
  1. func (r *Raft) processRPC(rpc RPC) {

  2.    ......

  3.    switch cmd := rpc.Command.(type) {

  4.    case *AppendEntriesRequest:  //日志拷贝请求

  5.        r.appendEntries(rpc, cmd)

  6.    case *RequestVoteRequest: //投票请求

  7.        r.requestVote(rpc, cmd)

  8.    case *InstallSnapshotRequest: //给日志打快照

  9.        r.installSnapshot(rpc, cmd)

  10.    ......

  11.    }

  12. }

 投票过程,具体看r.requestVote(rpc, cmd)

 
   
   
 
  1. func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {  //Follower收到的投票请求 handler函数

  2.    ......

  3.    resp := &RequestVoteResponse{ //先构造一个回包

  4.        RPCHeader: r.getRPCHeader(),

  5.        Term:      r.getCurrentTerm(), //还需要携带投票者的任期号,如果不给候选人投票,可以让候选人能跟上时代(更新自己的任期号)

  6.        Granted:   false,  //是否同意投票

  7.    }

  8.    var rpcErr error

  9.    defer func() {  //函数退出前,会触发回包 response

  10.        rpc.Respond(resp, rpcErr)

  11.    }()

  12.    ......

  13.    candidate := r.trans.DecodePeer(req.Candidate)

  14.    if leader := r.Leader(); leader != "" && leader != candidate {  //如果当前节点中发现集群中已经存在了leader,那么拒绝投票

  15.        r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v",

  16.            candidate, leader)

  17.        return

  18.    }

  19.    //如果候选人的任期号比自己还小,那么就拒绝投票

  20.    if req.Term < r.getCurrentTerm() {

  21.        return

  22.    }


  23.    // Increase the term if we see a newer one  //更新任期为Candidate的任期

  24.    if req.Term > r.getCurrentTerm() {

  25.        // Ensure transition to follower

  26.        r.setState(Follower)

  27.        r.setCurrentTerm(req.Term)

  28.        resp.Term = req.Term

  29.    }


  30.    // Check if we have voted yet

  31.    lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)

  32.    lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)

  33.    /**

  34.     如果自己在当期任期已经投票了,那么也必须拒绝投票。同一任期内不得重复投票,否则会导致多个Leader的产生,也就是集群分裂

  35.     */

  36.    // Check if we've voted in this election before

  37.    if lastVoteTerm == req.Term && lastVoteCandBytes != nil {

  38.        r.logger.Printf("[INFO] raft: Duplicate RequestVote for same term: %d", req.Term)

  39.        if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {

  40.            r.logger.Printf("[WARN] raft: Duplicate RequestVote from candidate: %s", req.Candidate)

  41.            resp.Granted = true

  42.        }

  43.        return

  44.    }


  45.    // Reject if their term is older

  46.    lastIdx, lastTerm := r.getLastEntry()  //获取本节点最后的日志index和对应的任期。候选者的日志信息必须更加新,才给他投票,否则拒绝

  47.    if lastTerm > req.LastLogTerm {  

  48.        return

  49.    }


  50.    if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {

  51.        return

  52.    }

  53.    //持久化投票的结果

  54.    if err := r.persistVote(req.Term, req.Candidate); err != nil {

  55.        return

  56.    }

  57.    resp.Granted = true

  58.    r.setLastContact() //设置最后一次与leader或者candidate的联系时间,这个很重要,Follower->candidate的时候会去判断上一次的通信的时间点

  59.    return

  60. }

上面介绍了consul raft 库的节点leader 选主的详细流程和处理过程,下图可以更清晰的看到整个过程:


5 总结


本文介绍了consul raft 算法库中节点从启动到leader选举的过程,下次将详细介绍算法库的另一核心部分-日志复制的详细实现原理。

以上是关于Consul 核心原理解析之-Raft leader选举的主要内容,如果未能解决你的问题,请参考以下文章

raft算法解析

由Consul谈到Raft

一起学Consul——Raft算法

手撸golang etcd raft协议之11

什么是Raft算法

手撸golang etcd raft协议之9,10