[8][lab] lab2: raft impl
Posted WhateverYoung
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[8][lab] lab2: raft impl相关的知识,希望对你有一定的参考价值。
lab 2 raft
- 本节作为实现ft KV store的基础部分,实现raft状态机复制协议,lab3基于lab2的raft模块,构建KV service,lab4基于上述构建shared KV service
- 一般来说,容错通过复制集实现状态的复制,保证在少数节点故障的场景下服务依旧可用,挑战是数据的一致性
- Raft控制一个服务的状态复制,保证故障后的一致性,保证所有operator log按照相同的顺序在所有复制集节点上执行,保证所有节点对log的内容达成一致性共识。当故障节点重新上线时,raft采用一定的策略保证它慢慢到达最新的一致性状态,Raft依赖主节点同步log,当没有主节点时,raft不会同步log,而是开始新一轮的选举
- 本节实现Raft为模块独立,每个Raft实例之间通过rpc调用来维持复制状态机,log entries with index numbers,每条entry写来index,最终达成一致的情况下commit,此时raft模块会将结果返回上层服务来执行(仅允许rpc调用,不允许共享变量,不允许依赖共享存储文件等等)
- 主要参考raft paper来做实现,同时参考reference
- 更深入理解一致性,可以参考paxos、chubby、Paxos Made Live、Spanner、Zookeeper、Harp…
- 本节会实现raft论文中大部分操作,但不是全部,包括持久化日志,重启,但不会实现集群身份转变和快照以及日志压缩功能,section 6 and 7
lock advice
- rule 1: 多个go程同时访问的数据结构需要加锁保护,go自带的race检测可以很好的发现这个问题
- rule 2: 类似事务的概念,一连串修改如果要保证同时生效而不是部分可见,需要一起锁住,mu保护两个状态,无论要使用哪个变量都需要获取该锁
rf.mu.Lock()
rf.currentTerm += 1
rf.state = Candidate
rf.mu.Unlock()
- rule 3:当某个go程的一系列读写操作中间被别人修改会出错时,整个部分都需要加锁临界区保护,同时注意currentTerm变量其他go程使用需要同一的锁做保护,某些rpc handle更是需要全过程加锁保护
rf.mu.Lock()
if args.Term > rf.currentTerm
rf.currentTerm = args.Term
rf.mu.Unlock()
- rule 4:加锁的临界区不应该有耗时的阻塞操作,比如读阻塞信道,写信道,等待定时器,sleep,或者发送同步rpc。原因有二,第一是减少了并发度,此时其他go程可以继续持锁干活;第二是防止死锁,peer rpc互相持锁发送rpc依赖对方的锁,会造成死锁;rpc场景如果确实需要持锁,那么可以另起其他go程去做rpc操作,主go程持锁做其他事,之间通过信道通信
- rule 5:小心释放锁重新获得锁之后的状态,一般这么做是为了提高性能,防止等待,但是需要确保重新获取锁之后,状态和之前释放锁的状态是一致的;下述例子,有两个错误,第一rf.currentTerm需要显式传入go程,copy一份,否则go程执行的时候该变量已经被修改了;第二,go程重新获取锁后,需要检查rf.currentTerm是否还和传入的参数一致,一致才能继续操作,不一致则说明条件已经不满足
rf.mu.Lock()
rf.currentTerm += 1
rf.state = Candidate
for <each peer>
go func()
rf.mu.Lock()
args.Term = rf.currentTerm
rf.mu.Unlock()
Call("Raft.RequestVote", &args, ...)
// handle the reply...
()
rf.mu.Unlock()
- 总体来说,满足上述条件的代码开发颇具挑战,具体来说决定临界区何时开始何时结束,哪一系列操作需要加锁是困难,同时并发调试也是不容易的,一个更具有实际意义的方法是,首先假设没有并发,也就不需要锁保护,但是为了发送rpc不阻塞,仍旧需要创建go程,那么可以在每个rpc handlers执行全过程加锁保护,go程开始执行最初获得锁,执行结束在释放锁,完全避免了并发,也就可以满足上述rule 1 2 3 5. 这样就避免从大量代码系列中寻找临界区的麻烦,rule 4依旧是个麻烦,所以下一步就是找到阻塞的位置,增加代码在阻塞之前释放锁,在阻塞之后得到锁后,小心检查此时的状态是否和阻塞之前一致,按照上述的步骤操作较为容易。上述方案的弊端显而易见,就是性能差一些,很多不需要锁的地方也被加锁保护了,换句话说,某一个Raft peer内部全部是串行的,无法发挥多核cpu的特性
Raft Structure Advice
- raft示例主要处理外部事件,包括AppendEntry RequestVote Rpcs,Rpc replies and start calls,后台周期性任务,包括心跳和选举,实现上述所有功能的数据结构有很多,本文给出一些提示
- 每个raft实例的状态(log、current index)这些需要都需要在事件到达过程或者reply过程中并发的进行读写访问,go官方文档针对这种模式提供两种机制,共享数据+锁,信道通信,实践证明前者实现起来更直观一些
- 一些周期性的任务,比如主节点定时发送心跳,备节点在指定时间内没有收到主节点心跳,发起选举,这些周期性任务最好每个都独立的启动go程去执行,而不是用一个后台go程处理所有的事件
- 处理心跳超时重新发起选举是一个棘手问题,最简单的做法,结构中记录上一次主节点心跳的时间戳,超时go程周期性检查该变量是否超时,建议使用time.Sleep()参数使用一个小的常数,而不是使用time.Ticker 或者time.Timer,要想用对这两个需要一些技巧
- 另外一个单独的go程来提交log entry到applyChan,务必保证是一个独立go程,因为发送信道会导致阻塞,另外务必是一个单线程模式,防止order乱序,建议增加ommitIndex后使用使用sync.Cond来c唤醒applyLog GO程
- 每个RPC请求的接收和发送返回需要独立Go程,原因有二,第一某一个不可达的peer不会阻塞大部分的回复过程,第二使得心跳和超时选举可以不被阻塞一致正常进行;简单的实现rpc reply过程使用同一个go程,而不是通过信道发送消息
- 注意,网络会延迟rpc请求的发送和返回,当并发发送rpc请求时,发送顺序和返回都是乱序的,rpc handler需要忽略那些old term的消息;主节点尤为要注意,处理消息返回时,要注意term假设依旧没变,同时需要注意处理并发rpc的返回时,修改leader的状态
Students’ Guide to Raft
代码结构
- 主要开发集中在raft.go
- Make接口,创建一个Raft peer,传入所有peer网络标识符,index,
- Start(command)通知raft系统执行append command到replica log,start应当立即返回,发送ApplyMsg消息将新的commit log entry发送到applyChan
- Raft peers之间通信通过labrpc,基于GO原生rpc库,使用channel代替网络socket,raft.go中包括一些实例rpc,比如sendRequestVode,RequestVote
- labrpc中会对rpc做一些错误注入,延迟,乱序以及删除来代表网络异常,不允许修改labrpc库
to be continue…
labgob
包装gob库,做了两点warning,第一是发送不能导出的结构体成员会warn,因为gob默认会忽略没有导出的成员;第二,如果decode传入的结构体包括一些不是默认值的变量,warn,因为decode不会覆盖这些非默认值。简单来说,都是容易出错且很难排查的场景,因此就在这里提前预警。
TODO register的含义,设计go的反射用法
labrpc
基于channel实现rpc,用来注入网络的故障,比如延时,丢包,乱序,网络隔离等等错误,借鉴net/rpc/server.go实现。
TODO 实现思路学习,涉及go的很多基础案例和用法
Part 2A
实现选举和心跳过程,2A主要考察选举过程,第一,没有失效的时候稳定的有一个主节点,第二,当主节点失效,可以选举出新的主节点
Hints
- 通过figure 2确定需要存储的状态state,存储在raft结构中,同时定义rpc请求消息和返回消息
- 实现投票RPC和同步logRPC
- Make方法中,实现后台定时超时触发选举流程,使用随机数保证不发生分票的情况
- Test需要,保证心跳不超过10次/s,即心跳发生周期大于100ms
- test需要,保证在5s内选举出新的leader,因此timeout超时时间要设置的小一些
- Section5.2中,配置超时时间150ms到300ms,这需要心跳至少发生2次在150ms之内,即心跳周期小于75ms,但是test要求周期大于100ms,因此实验中的timeout要大于paper中的配置,但是不能太大,需要自己尝试找到合适的配置
- 尽量使用time.Sleep,Timer和Ticker容易出错
- 尽可能读懂Figure 2
- 使用Debug print进行错误排查
Part 2B
实现leader和follower的log同步过程,通过心跳rpc,并可以达成共识;实现Start方法,供客户端使用来发送命令到raft,实现commit过程,并将commit的log entry返回给client进行状态机执行
Hints
- 实现选举限制条件,section5.4.1,只有up-to-date log的节点才能被投票
- 注意,当有主节点稳定存在时,不应该发生切主,即小心处理投票rpc,在有稳定主节点时,需要忽略该消息;成为主节点后,要立即发送心跳,同时主要接受心跳过程中对超时timer需要重置,来防止新的选举发生
- 代码经常需要等待某个事件发生,不要轮询,可以使用sync.Cond或者channel,最简单可以用无限循环+sleep实现轮询
- 尽可能的让raft的代码,简洁易懂,后面的实验仍旧需要基于raft构建,请尽可能保证简单性和可理解性
Part 2C
2C主要考察某个raft节点reboot,重新加入集群后仍旧可以保证一致性,这需要持久化raft中的一些关键状态,并在状态发生变更的过程中随时保证关键状态的持久化。真正实现持久化需要保证每一次状态变更都落盘,这样重启后从磁盘中获取到最新的状态。本次实验不用disk,使用内存来模拟。
首先实现persist()和readPersist()方法,这里使用labgod;然后决定何时执行persist方法,即寻找状态变更需要落盘的代码段
Hints
- 2C测试中有很多节点挂掉,网络中断的场景
- 实现文中提到的优化,如果追随者的log和主节点log不一致,要直接返回到该term不一致的最初index
- 2A 2B 2C需要在4分钟内完成,CPU time 1分钟
状态和状态迁移梳理
type LogEntry struct
LogIndex int
LogTerm int
Command interface
主要参考Figure 2:
leaderId int // leader's id, initialized to -1
currentTerm int // latest term server has seen, initialized to 0
votedFor int // candidate that received vote in current term, initialized to -1
commitIndex int // index of highest log entry known to be committed, initialized to 0
lastApplied int // index of highest log entry known to be applied to state machine, initialized to 0
state serverState // state of server
status serverStatus // live or dead
log []LogEntry // log entries len()=5 (0,1,2,3,4 0 is no use) then logIndex=5
logIndex int // index of next log entry to be stored, initialized to 1
nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry, used to track committed index
applyCh chan ApplyMsg // apply to client
notifyCh chan struct // notify to apply
timeOutElectionTimer *time.Timer // timer used for timeout for election
// 其中需要持久化的是currentTerm,log[],votedFor
// 上述log[] 0index,不使用,从1开始
// logIndex用来记录下一条log的Index,这里单独一个变量而不是直接append主要是为了方便,实际上上figure中提高的last log index + 1
// nextIndex记录主节点要发送给从每个节点的index,初始化为logIndex即可
// nextIndex初始化为0,代表没有match的index,变量实际含义就是该server已经复制到的index
// commitIndex和lastApplied有点混淆,前者表示leader已经明确复制了多数节点,后者表示已经应用到状态机,即返回给了应用层apply
接下来考虑节点状态迁移:leader,follower,candidate
figure 4给出了迁移图和迁移条件
// caller must hold mu
// reference: Figure 4
func (rf *Raft) stateTransitionWithLock(targetState serverState)
illegalStateLeader:= false
switch rf.state
case Leader:
// detect higher term
if targetState == Follower
rf.state = targetState
else
illegalStateLeader = true
case Follower:
// election timeout
if targetState == Candidate
rf.state = targetState
else
illegalStateLeader = true
case Candidate:
// the term election timeout again, Candidate->Candidate
// the term election success, Candidate->Leader
// the term election fail(find a legal leader) or detect higher term, Candidate->Follower
rf.state = targetState
default:
illegalStateLeader = true
if illegalStateLeader
log.Fatal("server state invalid")
考虑正常运行的raft实例状态可能发生迁移的时机:
1、election timeout,(timeout again when election)=> candidate
2、receive rpc reply with higher term => follower
3、receive rpc requset and find a legal leader =>follower
4、send VoteRpc gain majority votes => leader
trans位置:
1、timeout到达,follower(candidate 上一轮timeout) => candidate
2、选举得到多数票,candidate => leader
3、发送VoteRpc请求,response检测到高term,*=>follower
4、发送AppendRpc请求,response检测到高term,*=>follower
5、接收VoteRpc,检测到高term,*=>follower
6、接收AppendRpc,检测到高term,*=>follower
7、接收AppendRpc,检测到同term主,*=>follower
timeout timer reset触发的位置:
1、触发选举的同时,reset,保证本轮在timeout没有选举出来的场景,开启下一轮选举
2、接收VoteRpc,投票给别人,reset,本轮自己不需要触发选举
3、接收VoteRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时
4、接收AppendRpc,发现合法leader,reset,存在合法主节点,不需要触发选举
5、接收AppendRpc,检测到高term,状态迁移到follower,reset本轮重新开始计时
6、发送VoteRpc请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时
7、发送Append请求,response检测到高term,状态迁移到follower,reset本轮重新开始计时
replicaLogs触发位置:
选举成为leader之后开始后台同步log
VoteRPC
RPC实现:
- rf.currentTerm == args.Term && rf.votedFor == args.CandidateId 返回投票,resetTimer
- rf.currentTerm > args.Term 或 本轮已经投票但不是这位候选,不投票
- 检测到高term,rf.currentTerm, rf.votedFor = args.Term, -1,持久化,状态到follower,resetTimer
- 比较log,term大优先;term一致长的优先,不满足不投票
- 满足要求,rf.votedFor = args.CandidateId,resetTimer,持久化,然后投票
SendVote实现startElection:
- 确认状态不是leader继续
- 状态迁移到Candidate,term+1,self vote,异步发起VoteRPC,false重试;检测到高term结束本轮选举;其他场景收集成功的多数票后,!!!注意此时检测当前term是否还是args.Term!!因为允许多轮选举并发,此处要做判断,多数票是否为本轮选举产生的,状态迁移到leader,持久化,初始化leader数据结构,开始后台replicaLog
- 接收返回的处理,检测到高term,迁移到Follower,resetTimer,持久化后结束本轮选举;(注意发请求之前检测状态是否为Candidate,因为并发,不做也可以,做的话可以快速结束本轮选举)
AppendLogRPC
RPC实现:
- rf.currentTerm > args.Term,返回失败,不合法的leader
- else 合法leader,首先resetTimer,有leader不触发选举
- 检测到高term,触发状态转移到follower,resetTimer,持久化流程
- term一致,状态迁移到follower,主要是为了Candidate到follower
- 判断如何接收log,如果不匹配(方法是是否有prevLogIndex,且term一致),返回conflict index,不匹配term最初的一条logIndex,返回失败
- 如果匹配,将发来的log全部接收(是否需要更新,需要更新的话,更新多少,删除不匹配的部分),更新commitIndex,触发客户端notifyApply
leader bgReplicateLog实现:
- 定时发送,使用sleep即可完成,重试线程和主线程,注意检查state不是主是退出即可
- 失败重试,有个小细节,重试的时候仅发送空消息体,logs等待下次周期到达在发送
- 发送请求,检测状态是否为主?可加可不加
- 构造请求,处理是否为空的条件
- 处理返回,如果失败,两种情况,高term检测,触发状态转移;否则说明log不一致,更新nextIndex为conflictIndex,注意判断不要小于1,且不要超过logIndex,保证nextIndex合法
- 处理返回,如果成功,通过matchIndex判断是否commit,满足commit条件修改commitIndex,触发notifyApply
- notice,返回可能出现乱序,更新nextIndex和matchIndex小心不要回退
StartCommand
- 检查是否为主节点
- 追加log,修改logIndex,持久化后返回成功
bgApply
- notifyApply判断是否满足apply条件,astApplied < logIndex && lastApplied < commitIndex,满足发送信道;触发条件,leader commit和follower接收了log
- 将commit为apply的entry按照顺序发送到applyCh
reference
以上是关于[8][lab] lab2: raft impl的主要内容,如果未能解决你的问题,请参考以下文章