etcd Raft 源码剖析

Posted 凌桓丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd Raft 源码剖析相关的知识,希望对你有一定的参考价值。

文章目录


Raft源码阅读

etcd 是 coreOS 使用 golang 开发的分布式,一致性的 kv 存储系统,因其易用性和高可靠性被广泛运用于服务发现、消息发布和订阅、分布式锁和共享配置等方面,也被认为是 zookeeper 的强有力的竞争者。

作为分布式 kv,其底层使用 raft 算法实现多副本数据的强一致性。etcd 作为 raft 开源实现的标杆,在设计上,将 raft 算法逻辑和持久化、网络、线程等完全抽离出来单独实现,充分解耦,在工程上,实现了诸多性能优化,是 raft 开源实践中较早的工业级的实现,很多后来的 raft 实践者都直接或者间接的参考了 ectd-raft 的设计和实现,例如 kubernetes,TIdb 等。其广泛的影响力和优雅的 golang 代码实践也使得 ectd 成为 golang 的明星项目。

在我们实际的分布式存储系统的项目开发中,raft 也被应用于元信息管理和数据存储等多个模块,因此熟悉和理解 etcd-raft 的实现具有重大意义,本文的主要内容就是分析 raft 在 ectd 中的具体实现。

源码链接:etcd

中文注释:etcd-中文注释

代码结构

首先给出代码结构,然后按照各个模块依次介绍

$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'
.
├── raftpb
├── doc.go
├── log.go
├── log_unstable.go
├── logger.go
├── node.go
├── progress.go
├── raft.go
├── rawnode.go
├── read_only.go
├── status.go
├── storage.go
└── util.go


消息结构

Raft 的序列化是基于 Protocol Buffer 实现的,因此在该目录下就定义了几个需要序列化的数据结构。

Entry

从整体上来说,一个集群中的每个节点都是一个状态机,而 raft 管理的就是对这个状态机进行更改的一些操作,这些操作在代码中被封装为一个个 Entry

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raftpb/raft.pb.go

type Entry struct 
	Term             uint64    //选举任期,每次选举之后递增1。用于标记信息的时效性
	Index            uint64    //当前这个entry在整个raft日志中的位置索引
	Type             EntryType //当前entry的类型,目前etcd支持两种类型:EntryNormal和EntryConfChange,EntryNormal代表当前Entry是对状态机的操作,EntryConfChange则代表对当前集群配置进行更改的操作
	Data             []byte    //被序列化后的byte数组,代表当前entry真正要执行的操作。当Type为EntryNormal时为key-value pair;当Type为EntryConfChange时为ConfChange


Message

Raft 集群中节点之间通过传递不同的 Message 来完成通讯,这个 Message 结构涵盖了各种消息所需的字段。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raftpb/raft.pb.go

type Message struct 
	Type             MessageType //消息类型(心跳MsgHeartbeat、日志MsgApp、投票MsgVote等)
	To               uint64      //这个消息的接受者
	From             uint64      //这个消息的发送者
	Term             uint64      //这个消息发出时整个集群所处的任期,即逻辑时钟
	LogTerm          uint64      //消息发出者所保存的日志中最后一条的任期号
    Index            uint64      //日志索引号。如果当前消息是MsgVote的话,代表这个候选人最后一条日志的索引号
	Entries          []Entry     //需要存储的日志
	Commit           uint64      //已经提交的日志的索引值,用来向别人同步日志的提交信息
	Snapshot         Snapshot    //快照
	Reject           bool        //对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)
	RejectHint       uint64      //对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)
	Context          []byte      //上下文信息


日志模块

log_unstable.go

unstable 数据结构用于还没有被用户层持久化的数据,它维护了两部分内容 snapshotentries 。由于日志信息不可能无休止的增长,所以超过一定期限后就会被删除。而为了能保证将数据完整的同步到其他节点上,每个一段时间就会将当前的状态存储为快照的形式,此时新节点只需要以快照为基础,再执行快照之后的日志(通过offset确定),就可以完成状态同步。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log_unstable.go

type unstable struct 
	// 保存还没有持久化的快照数据
	snapshot *pb.Snapshot
	// 还未持久化的数据
	entries []pb.Entry
	// offset用于保存entries数组中的数据的起始index
	offset  uint64

	logger Logger

具体结构关系如下图

unstable 结构关系图


Storage

这个文件定义了一个 Storage 接口,因为 etcd 中的 raft 实现并不负责数据的持久化,所以它希望上面的应用层能实现这个接口,以便提供给它查询 log 的能力。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/storage.go

type Storage interface 
	// 返回保存的初始状态
	InitialState() (pb.HardState, pb.ConfState, error)
    
	// 返回索引范围在[lo,hi)之内并且不大于maxSize的entries数组
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)

	// 传入一个索引值,返回这个索引值对应的任期号,如果不存在则error不为空,其中:
	// ErrCompacted:表示传入的索引数据已经找不到,说明已经被压缩成快照数据了。
	// ErrUnavailable:表示传入的索引值大于当前的最大索引
	Term(i uint64) (uint64, error)
    
	// 获得最后一条数据的索引值
	LastIndex() (uint64, error)

	// 返回第一条数据的索引值
	FirstIndex() (uint64, error)

	// 返回最新的快照数据
	Snapshot() (pb.Snapshot, error)

另外,这个文件也提供了 Storage 接口的一个内存版本的实现 MemoryStorage,这个实现同样也维护了 snapshotentries 这两部分,他们的排列跟 unstable 中的类似,也是 snapshot 在前,entries 在后。从代码中看来 etcdserverraftexample 都是直接用的这个实现来提供 log 的查询功能的。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/storage.go

// 使用在内存中的数组来实现 Storage 接口的结构体,具体实现参考上面的链接
type MemoryStorage struct 
	sync.Mutex

	hardState pb.HardState
	snapshot  pb.Snapshot
	ents []pb.Entry


log.go

这个结构体承担了 raft 日志相关的操作。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log.go

type raftLog struct 
	// 用于保存自从最后一次snapshot之后提交的数据
	storage Storage

	// 用于保存还没有持久化的数据和快照,这些数据最终都会保存到storage中
	unstable unstable

	// committed数据索引
	committed uint64

	// committed保存是写入持久化存储中的最高index,而applied保存的是传入状态机中的最高index
	// 即一条日志首先要提交成功(即committed),才能被applied到状态机中
	// 因此以下不等式一直成立:applied <= committed
	applied uint64

	logger Logger

一条日志数据,首先需要被 committed 成功,然后才能被应用 applied 到状态机中。因此,以下不等式一直成立:applied <= committed

raftLog 结构图

这个数据布局从下面这段初始化函数也可以看出

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log.go

func newLog(storage Storage, logger Logger) *raftLog 
	if storage == nil 
		log.Panic("storage must not be nil")
	
	log := &raftLog
		storage: storage,
		logger:  logger,
	
	firstIndex, err := storage.FirstIndex()
	if err != nil 
		panic(err) // TODO(bdarnell)
	
	lastIndex, err := storage.LastIndex()
	if err != nil 
		panic(err) // TODO(bdarnell)
	
	// offset从持久化之后的最后一个index的下一个开始
	log.unstable.offset = lastIndex + 1
	log.unstable.logger = logger

	// committed和applied从持久化的第一个index的前一个开始
	log.committed = firstIndex - 1
	log.applied = firstIndex - 1

	return log

持久化存储和非持久化存储的分界线其实就是 lastIndex。在此之前都是 Storage 管理的已经持久化的数据,而在此之后都是 unstable 管理的还没有持久化的数据。


状态机

progress.go

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/progress.go

type Progress struct 
	// Next保存的是下一次leader发送append消息时传送过来的日志索引
	// 当选举出新的leader时,首先初始化Next为该leader最后一条日志+1
	// 如果向该节点append日志失败,则递减Next回退日志,一直回退到索引匹配为止

	// Match保存在该节点上保存的日志的最大索引,初始化为0
	// 正常情况下,Next = Match + 1
	// 以下情况下不是上面这种情况:
	// 1. 切换到Probe状态时,如果上一个状态是Snapshot状态,即正在接收快照,那么Next = max(pr.Match+1, pendingSnapshot+1)
	// 2. 当该follower不在Replicate状态时,说明不是正常的接收副本状态。
	//    此时当leader与follower同步leader上的日志时,可能出现覆盖的情况,即此时follower上面假设Match为3,但是索引为3的数据会被
	//    leader覆盖,此时Next指针可能会一直回溯到与leader上日志匹配的位置,再开始正常同步日志,此时也会出现Next != Match + 1的情况出现
	Match, Next uint64

	// 三种状态
	// ProgressStateProbe:探测状态
	// ProgressStateReplicate:副本状态
	// ProgressStateSnapshot:快照状态
	State ProgressStateType

	// 在状态切换到Probe状态以后,该follower就标记为Paused,此时将暂停同步日志到该节点
	Paused bool

	// 如果向该节点发送快照消息,PendingSnapshot用于保存快照消息的索引
	// 当PendingSnapshot不为0时,该节点也被标记为暂停状态。
	// raft只有在这个正在进行中的快照同步失败以后,才会重传快照消息
	PendingSnapshot uint64

	// 如果进程最近处于活跃状态则为 true(收到来自跟随者的任意消息都认为是活动状态)。在超时后会重置重置为false 
	RecentActive bool

	// 用于实现滑动窗口,用来做流量控制
	ins *inflights

对于不同的状态,其会采取不同的行为:

  • ProgressStateProbe:探测状态,当 follower 拒绝了最近的 append 消息时,那么就会进入探测状态,此时 leader 会试图继续往前追溯该 follower 的日志从哪里开始丢失的。在 probe 状态时,leader 每次最多 append 一条日志,如果收到的回应中带有 RejectHint 信息,则回退 Next 索引,以便下次重试。在初始时,leader 会把所有 follower 的状态设为 probe,因为它并不知道各个 follower 的同步状态,所以需要慢慢试探。
  • ProgressStateReplicate:当 leader 确认某个 follower 的同步状态后,它就会把这个 follower 的 state 切换到这个状态,并且用 pipeline 的方式快速复制日志。leader 在发送复制消息之后,就修改该节点的 Next 索引为发送消息的最大索引 + 1。
  • ProgressStateSnapshot:接收快照状态。当 leader 向某个 follower 发送 append 消息,试图让该 follower 状态跟上 leader 时,发现此时 leader上保存的索引数据已经对不上了,比如leader在index为10之前的数据都已经写入快照中了,但是该 follower 需要的是 10 之前的数据,此时就会切换到该状态下,发送快照给该 follower。当快照数据同步追上之后,并不是直接切换到 Replicate 状态,而是首先切换到 Probe 状态。

从上面我们可以看出 Progress 是个状态机,下面是它的状态转移图:

Progress状态转移图


核心算法

raft.go

前面介绍了消息、日志、状态机,接下来就是 raft 的核心实现。其中大部分的逻辑都在状态机函数 Step 中:

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.go

// raft的状态机
func (r *raft) Step(m pb.Message) error 
	r.logger.Infof("from:%d, to:%d, type:%s, term:%d, state:%v", m.From, m.To, m.Type, r.Term, r.state)

	// Handle the message term, which may result in our stepping down to a follower.
	switch 
	case m.Term == 0:
		// 来自本地的消息
	case m.Term > r.Term:
		// 消息的Term大于节点当前的Term
		lead := m.From
		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote 
			// 如果收到的是投票类消息

			// 当context为campaignTransfer时表示强制要求进行竞选
			force := bytes.Equal(m.Context, []byte(campaignTransfer))
			// 是否在租约期以内
			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
			if !force && inLease 
				// 如果非强制,而且又在租约期以内,就不做任何处理
				// 非强制又在租约期内可以忽略选举消息,见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求
				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
				return nil
			
			// 否则将lead置为空
			lead = None
		
		switch 
		// 注意Go的switch case不做处理的话是不会默认走到default情况的
		case m.Type == pb.MsgPreVote:
			// Never change our term in response to a PreVote
			// 在应答一个prevote消息时不对任期term做修改
		case m.Type == pb.MsgPreVoteResp && !m.Reject:
		default:
			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
			// 变成follower状态
			r.becomeFollower(m.Term, lead)
		

	case m.Term < r.Term:
		// 消息的Term小于节点自身的Term,同时消息类型是心跳消息或者是append消息
		if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) 
			// 收到了一个节点发送过来的更小的term消息。这种情况可能是因为消息的网络延时导致,但是也可能因为该节点由于网络分区导致了它递增了term到一个新的任期。
			// ,这种情况下该节点不能赢得一次选举,也不能使用旧的任期号重新再加入集群中。如果checkQurom为false,这种情况可以使用递增任期号应答来处理。
			// 但是如果checkQurom为True,
			// 此时收到了一个更小的term的节点发出的HB或者APP消息,于是应答一个appresp消息,试图纠正它的状态
			r.send(pb.MessageTo: m.From, Type: pb.MsgAppResp)
		 else 
			// ignore other cases
			// 除了上面的情况以外,忽略任何term小于当前节点所在任期号的消息
			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
				r.id, r.Term, m.Type, m.From, m.Term)
		
		// 在消息的term小于当前节点的term时,不往下处理直接返回了
		return nil
	

    //核心流程
	switch m.Type 
	case pb.MsgHup:
		// 收到HUP消息,说明准备进行选举
		if r.state != StateLeader 
			// 当前不是leader

			// 取出[applied+1,committed+1]之间的消息,即得到还未进行applied的日志列表
			ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
			if err != nil 
				r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
			
			// 如果其中有config消息,并且commited > applied,说明当前还有没有apply的config消息,这种情况下不能开始投票
			if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied 
				r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
				return nil
			

			r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
			// 进行选举
			if r.preVote 
				r.campaign(campaignPreElection)
			 else 
				r.campaign(campaignElection)
			
		 else 
			r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
		

	case pb.MsgVote, pb.MsgPreVote:
		// 收到投票类的消息
		if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) 
			// 如果当前没有给任何节点投票(r.Vote == None)或者投票的节点term大于本节点的(m.Term > r.Term)
			// 或者是之前已经投票的节点(r.Vote == m.From)
			// 同时还满足该节点的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那么就接收这个节点的投票
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.MessageTo: m.From, Type: voteRespMsgType(m.Type))
			if m.Type == pb.MsgVote 
				// 保存下来给哪个节点投票了
				r.electionElapsed = 0
				r.Vote = m.From
			
		 else 
			// 否则拒绝投票
			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
			r.send(pb.MessageTo: m.From, Type: voteRespMsgType(m.Type), Reject: true)
		

	default:
		// 其他情况下进入各种状态下自己定制的状态机函数
		r.step(r, m)
	
	return nil

将具体逻辑代码去掉,其实我们可以得到以下框架

func (r *raft) Step(m pb.Message) error 
    //...
    switch m.Type 
        case pb.MsgHup:
        //...
        case pb.MsgVote, pb.MsgPreVote:
        //...
        default:
            r.step(r, m)
    

Step 其实就是根据消息类型的不同(MsgHup/MsgVote/MsgPreVote)而去执行对应的逻辑(状态机)。而对于其他状态,此时则会执行 default 中的 step(函数指针,根据节点角色执行不同的函数stepLeader/stepFollower/stepCandidate)

Raft状态机


node.go

node 其实是应用层和 Raft 协议层之间的中转站,其将上层应用层的消息传递给底层协议层模块,并将协议层的结果反馈给应用层。从代码后半段中可以看到,这里通过 for-select 从 channel 中不断处理数据。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/node.go

func (n *node) run(r *raft) 
	var propc chan pb.Message
	var readyc chan Ready
	var advancec chan struct
	var prevLastUnstablei, prevLastUnstablet uint64
	var havePrevLastUnstablei bool
	var prevSnapi uint64
	var rd Ready

	lead := None
	prevSoftSt := r.softState()
	prevHardSt := emptyState

	for 
		if advancec != nil 
			// advance channel不为空,说明还在等应用调用Advance接口通知已经处理完毕了本次的ready数据
			readyc = nil
		 else 
			rd = newReady(r, prevSoftSt, prevHardSt)
			if rd.containsUpdates() 
				// 如果这次ready消息有包含更新,那么ready channel就不为空
				readyc = n.readyc
			 else 
				// 否则为空
				readyc = nil
			
		

		if lead != r.lead 
			// 如果leader发生了变化
			if r.hasLeader() 	// 如果原来有leader
				if lead == None 
					// 当前没有leader
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				 else 
					// leader发生了改变
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				
				// 有leader,那么可以进行数据提交,prop channel不为空
				propc = n.propc
			 else 
				// 否则,prop channel为空
				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
				propc = nil
			
			lead = r.lead
		

		select 
		// TODO: maybe buffer the config propose if there exists one (the way
		// described in raft dissertation)
		// Currently it is dropped in Step silently.
		case m := <-propc:
			// 处理本地收到的提交值
			m.From = r.id
			r.Step(m)
		case m := <-n.recvc:
			// 处理其他节点发送过来的提交值
			// filter out response message from unknown From.
			if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) 
				// 需要确保节点在集群中或者不是应答类消息的情况下才进行处理
				r.Step(m) // raft never returns an error
			
		case cc := <-n.confc:
			// 接收到配置发生变化的消息
			if cc.NodeID == None 
				// NodeId为空的情况,只需要直接返回当前的nodes就好
				r.resetPendingConf()
				select 
				case n.confstatec <- pb.ConfStateNodes: r.nodes():
				case <-n.done:
				
				break
			
			switch cc.Type 
			case pb.ConfChangeAddNode:
				r.addNode(cc.NodeID)
			case pb.ConfChangeRemoveNode:
				// block incoming proposal when local node is
				// removed
				// 如果删除的是本节点,停止提交
				if cc.NodeID == r.id 
					propc = nil
				
				r.etcd Raft 源码剖析

etcd Raft 源码剖析

etcd源码剖析-raft

RAFT算法深入(上)

Etcd 源码分析:Raft 实现

etcd源码解读之raft协议实现