etcd源码剖析-raft

Posted 考拉苑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd源码剖析-raft相关的知识,希望对你有一定的参考价值。

今天来讨论下etcd关于:

1. leader是怎么选举的?

2. log的复制怎么实现的?

3. 如何处理节点变更(增加,减少)?

首先来看下raft的node的数据结构:

type node struct {
    propc      chan pb.Message
    recvc      chan pb.Message
    confc      chan pb.ConfChange
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status}   

在raft的实现其实就是通过这些channel来传递各种消息

1. leader的选举

当使用newRaft初始化一个raft的node实例之后,对应的node默认就是是follower,并且其term=1:

// 初始化raft实例
r
:= newRaft(c)
r.becomeFollower(1, None)

// become follower
func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower     // r.reset(term)             // r.tick = r.tickElection   // r.lead = lead r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term)}

当前的step= stepFollower,tick = tickElection,这两个是两个函数,后面的vote leader过程中会用到。所有的初始化完成之后对应的新建的node开始运行起来,在node里面定义的channel都会被监听使用。

一般在最开始的时候最先触发的是tickc这个channel,因为应用里面会设置一个heartbeat,在tickElection里面的Step是开始选举的入口:

func (r *raft) tickElection() {
    r.electionElapsed++  // 记录当前elapsed已消耗的时间,后续与electionTimeout进行比较
    // 当前election消息已被广播 通过past election的时效Timeout
// 需要重置 election elapse的时效;并发送MsgHup消息 if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) }}

消息的类型为MsgHup的,会发起一个campaignElection的campaign,在campaign里面将自身设置为candidate,并递增currentTerm:

// 选举
r
.campaign(campaignElection)
//
func (r *raft) campaign(t CampaignType) { r.becomeCandidate()   // 候选者 voteMsg = pb.MsgVote  // message type = vote term = r.Term         // 任期(已属于最新term)

}
// 变成候选者
func (r *raft) becomeCandidate() { r.step = stepCandidate     // step对应stepCandidate r.reset(r.Term + 1)        // 任期 +1 r.tick = r.tickElection    // r.Vote = r.id r.state = StateCandidate    // state r.logger.Infof("%x became candidate at term %d", r.id, r.Term)}

接下来candidate就会向所有的peers来发送一个投票的请求,意在告诉其他的节点当前我的term是多少:发出提议  让其他的node进行投票【此过程属于两阶段提交2PC】

for id := range r.prs {     
    var ctx []byte
    if t == campaignTransfer {
        ctx = []byte(t)
    }
    r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}

在send函数之中,会把message放到raft的msgs之中:r.msgs = append(r.msgs, m),至于这个msgs在哪里消费的呢,可以看看Read中:【此时msgs并不安全,只是被committed并未被applied持久化】

rd := Ready{
    Entries:          r.raftLog.unstableEntries(),
    CommittedEntries: r.raftLog.nextEnts(),
    Messages:         r.msgs,}

在上文中提到的node里面说到的几种channel: 现在readyc要开始干活了:

case readyc <- rd:
    r.msgs = nil
    r.readStates = nil
    advancec = n.advancec

msgs因为已经读取过了,设置为空,并且会赋值advancec,进行到这readyc中已经有一个Ready对象,下面接着看读Ready的操作,是由应用程序自己去读取:

func (n *node) Ready() <-chan Ready { return n.readyc }

读取到的Ready里面包含了Vote消息,应用层会调用网络层发送消息出去,并且调用Advance(),其他node接收到网络层消息后,会调用Step()函数,来比较接收到的term和自己的term 的大小,如果接收到的比较大,那么就把自己置为follower,term和接收到的消息里面的term保持一致,然后返回一个voteRespMsg,表示为其投票:

r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})

如果自己的term比接收到的大呢,Step()就会直接返回一个空。

voteRespMsg的返回信息被之前的发送方接收到了之后,就会计算收到的选票数目是否大于所有node的一半,如果大于则自己成为leader,否则将自己置为follower:

case myVoteRespType:    
   gr := r.poll(m.From, m.Type, !m.Reject) switch r.quorum() { case gr: // 超过一半支持 成为leader
       if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } case len(r.votes) - gr: // 未达到要求 变成follower
       r.becomeFollower(r.Term, None) }

在成为leader之后,和上面的两个角色一样的,最重要的是step被置为了stepLeader,具体stepLeader中涉及到的一些操作,更多的是下一个问题会用到,这里就不多说了。

func (r *raft) becomeLeader() {
    r.step = stepLeader}


2. log复制的实现

在raft协议之中,对于整个集群的所有变更都必须通过leader来发起;例如我们要同时更新每个节点的某个数据,入口在node.Propose:

func (n *node) Propose(ctx context.Context, data []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})}

又到了step函数了,我们先来看看在follower里处理MsgProp的情况,在stepFollower里面可以看到:

case pb.MsgProp:        
       m.To = r.lead    // 将message转发给leader r.send(m)

将消息发送的目标设为当前leader,然后发送过去,leader在接收到数据之后,会将数据中的Entry做如下处理:

case pb.MsgProp:    
   r.appendEntry(m.Entries...)  // 追加到raft log r.bcastAppend()

appendEntry是把数据放到leader的raftLog之中,但是不会把这份数据commit掉,虽然会调用maybeCommit但是现在并不满足commit的条件,接着通过RPC将数据广播给除了自身的其他所有的节点:

func (r *raft) bcastAppend() {
    r.forEachProgress(func(id uint64, _ *Progress) {
        if id == r.id {  // 剔除自身节点
            return
        }
        r.sendAppend(id)
    })}

func (r *raft) sendAppend(to uint64) { m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed r.send(m)
}

接下来我们来看看follower接收到这个请求之后,会做一些什么处理:

case pb.MsgApp:    
   r.electionElapsed = 0 r.lead = m.From r.handleAppendEntries(m)

细看handleAppendEntries函数,就是把数据更新到自己的raftlog之中。

func (r *raft) handleAppendEntries(m pb.Message) {
   //
数据的Index小于committed,则说明当前数据比发送的更加新,直接把自己的commited返回给leader if m.Index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } // 数据的index大于committed 则需要更新raftlog的数据,
// 使用maybeAppend,更新成功则返回给leader最新的index
// 否则返回一个reject的标识给leader if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) }}

在leader接收到follower返回的MsgAppResp之后:

case pb.MsgAppResp:    
  if m.Reject { if pr.maybeDecrTo(m.Index, m.RejectHint) { r.sendAppend(m.From) } }else{ if pr.maybeUpdate(m.Index) { if r.maybeCommit() { r.bcastAppend() }else{ r.sendAppend(m.From) } } }

主要的逻辑如上所示,如果被Reject,那么就直接返回给leader。如果没有被follower Reject,那么这时候就会继续调用之前我们说的maybeCommit,那么在这里是可能被commit的:

func (r *raft) maybeCommit() bool {
    mis := make(uint64Slice, 0, len(r.prs))
    for _, p := range r.prs {
        mis = append(mis, p.Match)
    }
    sort.Sort(sort.Reverse(mis))
    mci := mis[r.quorum()-1]
    return r.raftLog.maybeCommit(mci, r.Term)
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } return false}

具体的逻辑就是检查各个peer的matchIndex,如果有大于一半的peer是match的,才会真正commit。如果这里已经commit了,则会再向所有的节点广播这个消息,当follower接收到这个消息之后,同样的也才会真正commit本地的raftlog:

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        l.commitTo(min(committed, lastnewi))
    }}   

看到这里我们就知道了,是不是有些类似于二阶段提交呢。可能有些朋友会问了,更新状态机的逻辑呢?状态机这部分的逻辑其实在用户代码里面调用node.Advance()之后,节点才会做apply去更新状态机:

case <-advancec:
        if prevHardSt.Commit != 0 {
            r.raftLog.appliedTo(prevHardSt.Commit)
        }func (l *raftLog) appliedTo(i uint64) {
    l.applied = i}       

到此为止,整个的log replication才会真正介绍。

3. 节点变更

从etcd-raft的架构来看,节点变更功能的实现需要应用和底层核心协议处理层互相配合。客户端发起节点增加或移除的命令,应用获得该请求,并将其转换为一个节点变更指令交给底层的raft协议核心处理层,并且一定是要leader来发起。

节点变更的信息在数据结构ConfChange中:

type ConfChange struct {
    ID               uint64  
    Type             ConfChangeType
    NodeID           uint64  
    Context          []byte
    XXX_unrecognized []byte   
}

NodeID为变更节点的ID,Type可以为ConfChangeAddNode,ConfChangeRemoveNode,'ConfChangeUpdateNode',ConfChangeAddLearnerNode。这里我们先只说前两种。

对于添加节点,节点启动之后首先是成为follower,然后在初始log里面对所有的peers添加一个ConfChangeAddNode的entry:

for _, peer := range peers {
        cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
        d, err := cc.Marshal()
        e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
        r.raftLog.append(e)
}

然后会apply它们,主要是为了让应用程序在StartNode之后可以调用Campaign。请注意这些节点将被添加到raft两次:在这里,当应用程序的就绪循环调用ApplyConfChange。addNode的调用必须在raftLog.append之后。我们不设置raftLog.applied,因此应用程序将能够通过Ready.CommittedEntries观察所有的conf变化。

r.raftLog.committed = r.raftLog.lastIndex()for _, peer := range peers {
    r.addNode(peer.ID)
}

调用的其实是r.addNodeOrLearnerNode(id, false)方法:

func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
    r.pendingConf = false
    pr := r.getProgress(id)
    if pr == nil {
        r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
    }}   

在raft协议核心处理层,增加节点便是为其分配一个Progress结构,通过该结构追踪对端节点的运行状态:

r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}

再来看看移除一个节点的时候的逻辑,其他的大体都一样,由客户端发起一个移除节点的指令ConfChangeRemoveNode,收到之后调用的是'removeNode'的方法:

func (r *raft) removeNode(id uint64) {
    r.delProgress(id)
    r.pendingConf = false

    if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
        return
    }

    if r.maybeCommit() {
        r.bcastAppend()
    }

    if r.state == StateLeader && r.leadTransferee == id {
        r.abortLeaderTransfer()
    }

}

这里移除该节点的Progress结构,而且还调用了maybeCommit,这是什么道理呢?因为节点移除一个之后,由于quorum变少而导致的某些之前pending的日志项可以被Commit。


以上是关于etcd源码剖析-raft的主要内容,如果未能解决你的问题,请参考以下文章

etcd Raft 源码剖析

etcd Raft 源码剖析

etcd源码剖析-raft

Etcd 源码分析:Raft 实现

etcd源码解读之raft协议实现

RAFT算法深入(上)