etcd源码剖析-raft
Posted 考拉苑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd源码剖析-raft相关的知识,希望对你有一定的参考价值。
今天来讨论下etcd关于:
1. leader是怎么选举的?
2. log的复制怎么实现的?
3. 如何处理节点变更(增加,减少)?
首先来看下raft的node的数据结构:
type node struct { propc chan pb.Message recvc chan pb.Message confc chan pb.ConfChange confstatec chan pb.ConfState readyc chan Ready advancec chan struct{} tickc chan struct{} done chan struct{} stop chan struct{} status chan chan Status}
在raft的实现其实就是通过这些channel来传递各种消息
1. leader的选举
当使用newRaft初始化一个raft的node实例之后,对应的node默认就是是follower,并且其term=1:
// 初始化raft实例
r := newRaft(c)
r.becomeFollower(1, None)
// become follower
func (r *raft) becomeFollower(term uint64, lead uint64) { r.step = stepFollower // r.reset(term) // r.tick = r.tickElection // r.lead = lead r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term)}
当前的step= stepFollower
,tick = tickElection
,这两个是两个函数,后面的vote leader过程中会用到。所有的初始化完成之后对应的新建的node开始运行起来,在node里面定义的channel都会被监听使用。
一般在最开始的时候最先触发的是tickc这个channel,因为应用里面会设置一个heartbeat,在tickElection
里面的Step
是开始选举的入口:
func (r *raft) tickElection() { r.electionElapsed++ // 记录当前elapsed已消耗的时间,后续与electionTimeout进行比较 // 当前election消息已被广播 通过past election的时效Timeout
// 需要重置 election elapse的时效;并发送MsgHup消息 if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0 r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) }}
消息的类型为MsgHup
的,会发起一个campaignElection的campaign,在campaign里面将自身设置为candidate,并递增currentTerm:
// 选举
r.campaign(campaignElection)
//
func (r *raft) campaign(t CampaignType) { r.becomeCandidate() // 候选者 voteMsg = pb.MsgVote // message type = vote term = r.Term // 任期(已属于最新term)
}
// 变成候选者
func (r *raft) becomeCandidate() { r.step = stepCandidate // step对应stepCandidate r.reset(r.Term + 1) // 任期 +1 r.tick = r.tickElection // r.Vote = r.id r.state = StateCandidate // state r.logger.Infof("%x became candidate at term %d", r.id, r.Term)}
接下来candidate就会向所有的peers来发送一个投票的请求,意在告诉其他的节点当前我的term是多少:发出提议 让其他的node进行投票【此过程属于两阶段提交2PC】
for id := range r.prs { var ctx []byte if t == campaignTransfer { ctx = []byte(t) } r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}
在send函数之中,会把message放到raft的msgs之中:r.msgs = append(r.msgs, m)
,至于这个msgs在哪里消费的呢,可以看看Read中:【此时msgs并不安全,只是被committed并未被applied持久化】
rd := Ready{ Entries: r.raftLog.unstableEntries(), CommittedEntries: r.raftLog.nextEnts(), Messages: r.msgs,}
在上文中提到的node里面说到的几种channel: 现在readyc
要开始干活了:
case readyc <- rd: r.msgs = nil r.readStates = nil advancec = n.advancec
msgs因为已经读取过了,设置为空,并且会赋值advancec,进行到这readyc中已经有一个Ready对象,下面接着看读Ready的操作,是由应用程序自己去读取:
func (n *node) Ready() <-chan Ready { return n.readyc }
读取到的Ready里面包含了Vote消息,应用层会调用网络层发送消息出去,并且调用Advance(),其他node接收到网络层消息后,会调用Step()函数,来比较接收到的term和自己的term 的大小,如果接收到的比较大,那么就把自己置为follower,term和接收到的消息里面的term保持一致,然后返回一个voteRespMsg,表示为其投票:
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
如果自己的term比接收到的大呢,Step()就会直接返回一个空。
voteRespMsg的返回信息被之前的发送方接收到了之后,就会计算收到的选票数目是否大于所有node的一半,如果大于则自己成为leader,否则将自己置为follower:
case myVoteRespType:
gr := r.poll(m.From, m.Type, !m.Reject) switch r.quorum() { case gr: // 超过一半支持 成为leader
if r.state == StatePreCandidate { r.campaign(campaignElection) } else { r.becomeLeader() r.bcastAppend() } case len(r.votes) - gr: // 未达到要求 变成follower
r.becomeFollower(r.Term, None) }
在成为leader之后,和上面的两个角色一样的,最重要的是step被置为了stepLeader
,具体stepLeader
中涉及到的一些操作,更多的是下一个问题会用到,这里就不多说了。
func (r *raft) becomeLeader() { r.step = stepLeader}
2. log复制的实现
在raft协议之中,对于整个集群的所有变更都必须通过leader来发起;例如我们要同时更新每个节点的某个数据,入口在node.Propose
:
func (n *node) Propose(ctx context.Context, data []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})}
又到了step函数了,我们先来看看在follower里处理MsgProp
的情况,在stepFollower
里面可以看到:
case pb.MsgProp:
m.To = r.lead // 将message转发给leader r.send(m)
将消息发送的目标设为当前leader,然后发送过去,leader在接收到数据之后,会将数据中的Entry做如下处理:
case pb.MsgProp:
r.appendEntry(m.Entries...) // 追加到raft log r.bcastAppend()
appendEntry
是把数据放到leader的raftLog之中,但是不会把这份数据commit掉,虽然会调用maybeCommit
但是现在并不满足commit的条件,接着通过RPC将数据广播给除了自身的其他所有的节点:
func (r *raft) bcastAppend() { r.forEachProgress(func(id uint64, _ *Progress) { if id == r.id { // 剔除自身节点 return } r.sendAppend(id) })}
func (r *raft) sendAppend(to uint64) { m.Type = pb.MsgApp m.Index = pr.Next - 1 m.LogTerm = term m.Entries = ents m.Commit = r.raftLog.committed r.send(m)
}
接下来我们来看看follower接收到这个请求之后,会做一些什么处理:
case pb.MsgApp:
r.electionElapsed = 0 r.lead = m.From r.handleAppendEntries(m)
细看handleAppendEntries
函数,就是把数据更新到自己的raftlog之中。
func (r *raft) handleAppendEntries(m pb.Message) {
// 数据的Index小于committed,则说明当前数据比发送的更加新,直接把自己的commited返回给leader if m.Index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } // 数据的index大于committed 则需要更新raftlog的数据,
// 使用maybeAppend,更新成功则返回给leader最新的index
// 否则返回一个reject的标识给leader if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) }}
在leader接收到follower返回的MsgAppResp
之后:
case pb.MsgAppResp:
if m.Reject { if pr.maybeDecrTo(m.Index, m.RejectHint) { r.sendAppend(m.From) } }else{ if pr.maybeUpdate(m.Index) { if r.maybeCommit() { r.bcastAppend() }else{ r.sendAppend(m.From) } } }
主要的逻辑如上所示,如果被Reject,那么就直接返回给leader。如果没有被follower Reject,那么这时候就会继续调用之前我们说的maybeCommit
,那么在这里是可能被commit的:
func (r *raft) maybeCommit() bool { mis := make(uint64Slice, 0, len(r.prs)) for _, p := range r.prs { mis = append(mis, p.Match) } sort.Sort(sort.Reverse(mis)) mci := mis[r.quorum()-1] return r.raftLog.maybeCommit(mci, r.Term)
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } return false}
具体的逻辑就是检查各个peer的matchIndex,如果有大于一半的peer是match的,才会真正commit。如果这里已经commit了,则会再向所有的节点广播这个消息,当follower接收到这个消息之后,同样的也才会真正commit本地的raftlog:
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { if l.matchTerm(index, logTerm) { lastnewi = index + uint64(len(ents)) l.commitTo(min(committed, lastnewi)) }}
看到这里我们就知道了,是不是有些类似于二阶段提交
呢。可能有些朋友会问了,更新状态机的逻辑呢?状态机这部分的逻辑其实在用户代码里面调用node.Advance()
之后,节点才会做apply去更新状态机:
case <-advancec: if prevHardSt.Commit != 0 { r.raftLog.appliedTo(prevHardSt.Commit) }func (l *raftLog) appliedTo(i uint64) { l.applied = i}
到此为止,整个的log replication才会真正介绍。
3. 节点变更
从etcd-raft的架构来看,节点变更功能的实现需要应用和底层核心协议处理层互相配合。客户端发起节点增加或移除的命令,应用获得该请求,并将其转换为一个节点变更指令交给底层的raft协议核心处理层,并且一定是要leader来发起。
节点变更的信息在数据结构ConfChange
中:
type ConfChange struct { ID uint64 Type ConfChangeType NodeID uint64 Context []byte XXX_unrecognized []byte
}
NodeID为变更节点的ID,Type可以为ConfChangeAddNode
,ConfChangeRemoveNode
,'ConfChangeUpdateNode',ConfChangeAddLearnerNode
。这里我们先只说前两种。
对于添加节点,节点启动之后首先是成为follower,然后在初始log里面对所有的peers添加一个ConfChangeAddNode
的entry:
for _, peer := range peers { cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} d, err := cc.Marshal() e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} r.raftLog.append(e)
}
然后会apply它们,主要是为了让应用程序在StartNode之后可以调用Campaign。请注意这些节点将被添加到raft两次:在这里,当应用程序的就绪循环调用ApplyConfChange。addNode的调用必须在raftLog.append之后。我们不设置raftLog.applied,因此应用程序将能够通过Ready.CommittedEntries观察所有的conf变化。
r.raftLog.committed = r.raftLog.lastIndex()for _, peer := range peers { r.addNode(peer.ID)
}
调用的其实是r.addNodeOrLearnerNode(id, false)
方法:
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { r.pendingConf = false pr := r.getProgress(id) if pr == nil { r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) }}
在raft协议核心处理层,增加节点便是为其分配一个Progress结构,通过该结构追踪对端节点的运行状态:
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
再来看看移除一个节点的时候的逻辑,其他的大体都一样,由客户端发起一个移除节点的指令ConfChangeRemoveNode
,收到之后调用的是'removeNode'的方法:
func (r *raft) removeNode(id uint64) { r.delProgress(id) r.pendingConf = false if len(r.prs) == 0 && len(r.learnerPrs) == 0 { return } if r.maybeCommit() { r.bcastAppend() } if r.state == StateLeader && r.leadTransferee == id { r.abortLeaderTransfer() }
}
这里移除该节点的Progress结构,而且还调用了maybeCommit
,这是什么道理呢?因为节点移除一个之后,由于quorum变少而导致的某些之前pending的日志项可以被Commit。
以上是关于etcd源码剖析-raft的主要内容,如果未能解决你的问题,请参考以下文章