Consul 核心原理解析之-Raft leader选举
Posted 云中间件技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Consul 核心原理解析之-Raft leader选举相关的知识,希望对你有一定的参考价值。
Consul 是 HashiCorp 公司推出的开源工具,用于实现分布式系统的服务发现与配置。与其他分布式服务注册与发现的方案相比,Consul包括以下开箱即用的“一站式”能力:
服务注册与发现
丰富的健康检查
Key/Value存储
多数据中心方案
Service mesh
分布式锁、leader选举、分布式协调能力
引入consul后系统不再需要依赖其他类似工具(比如ZooKeeper等) Consul官网(www.consul.io)同时给出了consul 和zookeeper、etcd、euraka、istio等众多竞品的对比,可见其自信程度。
TSF 微服务平台中的服务发现和配置中心基于consul构建,并在consul基础上进行了深度开发使其更加符合TSF产品的需求。主要包括支持consul server集群容量水平扩展、集群级容灾、多中心多活、自动快速构建集群、指定leader迁移、多租户隔离等能力。
以上的优化是建立的对consul 原理深入掌握基础上进行的,后续将分多次对consul的原理以及优化进行深入分享。前期先介绍consul 的算法基石-consul raft库的的原理。
Raft算法简介:数据一致性算法,通过日志复制,保证数据在集群中是一致性的。一个 Raft 集群包含若干个服务器节点;通常是 3 个,这允许整个系统容忍 1 个节点的失效。在任何时刻,每一个服务器节点都处于这三个状态之一:leader、candidate或者follower。在通常情况下,系统中只有一个leader并且其他的节点全部都是follower。follower都是被动的:他们不会发送任何请求,只是简单的响应来自leader或者candidate的请求。leader处理所有的客户端请求(如果一个客户端和follower联系,那么follower会把请求重定向给leader)。
本文重点介绍leader 选举部分,后续会依次介绍consul Raft库的剩下三部分,分别是:日志拷贝,集群成员变化和快照处理。
1 节点启动
通过NewRaft可以创建一个Raft节点,该节点初始启动的时候,会进入follower模式,然后根据实际的集群状态,转变角色Candidate和leader。 NewRaft创建节点的时候,会启动三个goroutine来处理不同的业务。
r.goFunc(r.run) //节点运行协程,通过检查state,根据state(Follower,Candidate, Leader)进入不同的模式
r.goFunc(r.runFSM) //状态机的处理协程,后面详细分析
r.goFunc(r.runSnapshots) //快照处理协程
r.run代码
/ run is a long running goroutine that runs the Raft FSM.
func (r *Raft) run() {
for {
......
// Enter into a sub-FSM
switch r.getState() { //获取当前节点的角色state
case Follower:
r.runFollower() //从runFollower函数退出,进入Candidate状态,然后进入runCandidate函数
case Candidate:
r.runCandidate()
case Leader:
r.runLeader() //如果是leader,那么进入leader处理函数,该函数不会退出,直到本节点退出或者本节点不再是leader
}
}
}
2 follower处理逻辑
r.runFollower() follower的工作只有两个,接受来自leader或者Candidate的请求,并进行处理;定期检查与leader的心跳时间是否超时,如果超时了则进入Candidate,该函数也就退出了。
func (r *Raft) runFollower() {
......
heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)
for {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc) //处理在follower期间收到的来自候选人或者leader的rpc数据包
case <-heartbeatTimer:
// Restart the heartbeat timer
heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)
......
r.setState(Candidate) //心跳时间超时,长时间没有收到leader的请求,进入候选人模式
return //退出runFollower -> 进入run循环 -> 根据state进入r.runCandidate()
......
case <-r.shutdownCh:
return
}
}
}
说明: 该库中对于一个节点,刚启动的时候,肯定是先进入follower的。并且需要有一个configuration的配置,才会进入候选者模式的。也就是对于一个节点,需要传递集群节点的配置,才会开始选择,否则会一直处于follower的状态(即使心跳时间超时)。 也就是新建完raft节点后,需要调用函数func (r *Raft) BootstrapCluster(configuration Configuration) Future,该函数会把当前集群所有节点的配置写入到log中,这样在选举的时候,就知道需要多少张票才能成为leader。
3 Candidate的处理
从runFollower中退出,就会进入到runCandidate的处理逻辑中。
func (r *Raft) runCandidate() {
......
voteCh := r.electSelf() //给所有peers发送投票请求,返回的是一个节点数大小的channel voteCh
electionTimer := randomTimeout(r.conf.ElectionTimeout) //选举超时时间,有可能多个Candidate同时选举,得到同样的票数
......
grantedVotes := 0
votesNeeded := r.quorumSize() //根据configuration计算成为leader需要的票数
for r.getState() == Candidate {
select {
case rpc := <-r.rpcCh:
r.processRPC(rpc)
......
case vote := <-voteCh: //收集response。在electSelf的时候,已经给自己投上一票了,往voteCh添加了一个数据。因此这里肯定会执行一次
if vote.Term > r.getCurrentTerm() { //收到一个投票请求的回包,对端的term比自己的大,Candidate变成Follower
r.logger.Printf("[DEBUG] raft: Newer term discovered, fallback to follower")
r.setState(Follower)
r.setCurrentTerm(vote.Term)
return
}
if vote.Granted { //response中,收到同意回包,票数+1
grantedVotes++
r.logger.Printf("[DEBUG] raft: Vote granted from %s in term %v. Tally: %d",
vote.voterID, vote.Term, grantedVotes)
}
if grantedVotes >= votesNeeded { //如果同意票数超过了大多数,当前节点转换角色为leader,进入leader的主循环
r.logger.Printf("[INFO] raft: Election won. Tally: %d", grantedVotes)
r.setState(Leader)
r.setLeader(r.localAddr)
return
}
......
case <-electionTimer: //如果选举超时了,Candidate成为不了leader,那么退出runCandidate,重新发起一次选举
// Election failed! Restart the election. We simply return,
// which will kick us back into runCandidate
r.logger.Printf("[WARN] raft: Election timeout reached, restarting election")
return
.....
}
}
}
上面分析可以看出,runCandidate主要做两件事情:
1)接受来自其他节点的rpc请求,然后转到函数r.processRPC(rpc)。对于三种角色的节点,都有这个处理的。
2)发起投票流程,收集投票反馈
其中发起投票的处理逻辑在函数r.electSelf()中。
func (r *Raft) electSelf() <-chan *voteResult {
respCh := make(chan *voteResult, len(r.configurations.latest.Servers)) //回收投票结果的channel缓冲区
r.setCurrentTerm(r.getCurrentTerm() + 1) //增加当前的任期,当前节点的term会被持久化到磁盘中
/**
* 投票开始时,Candidate将自己的term加1,并在request_vote中带上term;
* Follower只会接受任期号term比自己大的request_vote请求,并为之投票。
* 这条规则保证了只有最新的Candidate才有可能成为Leader。
*/
lastIdx, lastTerm := r.getLastEntry() //获取当前节点的日志信息
req := &RequestVoteRequest{ //投票请求包
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Candidate: r.trans.EncodePeer(r.localID, r.localAddr),
LastLogIndex: lastIdx,
LastLogTerm: lastTerm,
}
......
for _, server := range r.configurations.latest.Servers { //扫描configuration的节点列表,给有效的节点发起投票请求
if server.Suffrage == Voter {
//Voter代表该raft节点参与投票流程,另外还有Nonvoter只接受日志拷贝,但是不参与投票和大多数的计算
if server.ID == r.localID { //Candidate给自己投票
if err := r.persistVote(req.Term, req.Candidate); err != nil { //需要把选票持久化
return nil
}
// Include our own vote
respCh <- &voteResult{ //通过respCh传递投票结果
RequestVoteResponse: RequestVoteResponse{
RPCHeader: r.getRPCHeader(),
Term: req.Term,
Granted: true,
},
voterID: r.localID,
}
} else {
/**
*给其他raft节点发起投票请求,请求收集结果,放到respCh中,由runCandidate统一解释respCh,检查是否是有效的选票
*/
askPeer(server)
}
}
}
return respCh
}
Candidate发起投票的时候,有几个注意事项: 1)每个服务器都必须有当前的任期号,从零开始,以后逐渐单向往上递增。服务器重启后需要知道当前的任期号才可以正确的很其它节点交流,所以任期号是必须持久化的。 2)如果给候选节点投票了,要记录下被投票的候选节点ID。如果节点在选举期间给了一个候选人投票后突然宕机重启了,如果没有记下这个值,就很可能会重复投票,又给另一个节点投票去了。这就会导致集群存在多个Leader,也就是集群分裂。
4 r.processRPC(rpc)处理流程
follower,candidate,leader都有处理收到raft节点的rpc请求的函数,对于leader选举过程中,candidate发起的投票请求RequestVoteRequest,follower需要做出回应。先分析一下r.processRPC(rpc)如何处理选举过程中的收包和回包。
func (r *Raft) processRPC(rpc RPC) {
......
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest: //日志拷贝请求
r.appendEntries(rpc, cmd)
case *RequestVoteRequest: //投票请求
r.requestVote(rpc, cmd)
case *InstallSnapshotRequest: //给日志打快照
r.installSnapshot(rpc, cmd)
......
}
}
投票过程,具体看r.requestVote(rpc, cmd)
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { //Follower收到的投票请求 handler函数
......
resp := &RequestVoteResponse{ //先构造一个回包
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(), //还需要携带投票者的任期号,如果不给候选人投票,可以让候选人能跟上时代(更新自己的任期号)
Granted: false, //是否同意投票
}
var rpcErr error
defer func() { //函数退出前,会触发回包 response
rpc.Respond(resp, rpcErr)
}()
......
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate { //如果当前节点中发现集群中已经存在了leader,那么拒绝投票
r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v",
candidate, leader)
return
}
//如果候选人的任期号比自己还小,那么就拒绝投票
if req.Term < r.getCurrentTerm() {
return
}
// Increase the term if we see a newer one //更新任期为Candidate的任期
if req.Term > r.getCurrentTerm() {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(req.Term)
resp.Term = req.Term
}
// Check if we have voted yet
lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm)
lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand)
/**
如果自己在当期任期已经投票了,那么也必须拒绝投票。同一任期内不得重复投票,否则会导致多个Leader的产生,也就是集群分裂
*/
// Check if we've voted in this election before
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Printf("[INFO] raft: Duplicate RequestVote for same term: %d", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Printf("[WARN] raft: Duplicate RequestVote from candidate: %s", req.Candidate)
resp.Granted = true
}
return
}
// Reject if their term is older
lastIdx, lastTerm := r.getLastEntry() //获取本节点最后的日志index和对应的任期。候选者的日志信息必须更加新,才给他投票,否则拒绝
if lastTerm > req.LastLogTerm {
return
}
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
return
}
//持久化投票的结果
if err := r.persistVote(req.Term, req.Candidate); err != nil {
return
}
resp.Granted = true
r.setLastContact() //设置最后一次与leader或者candidate的联系时间,这个很重要,Follower->candidate的时候会去判断上一次的通信的时间点
return
}
上面介绍了consul raft 库的节点leader 选主的详细流程和处理过程,下图可以更清晰的看到整个过程:
5 总结
本文介绍了consul raft 算法库中节点从启动到leader选举的过程,下次将详细介绍算法库的另一核心部分-日志复制的详细实现原理。
以上是关于Consul 核心原理解析之-Raft leader选举的主要内容,如果未能解决你的问题,请参考以下文章