Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制相关的知识,希望对你有一定的参考价值。
关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
这里我们说下阿里开源的sofa-jraft的实现。
首先说明下,在sofa-jraft有几个比较重要的角色
- Node 代表的就是一个服务节点
- Ballot 代表的是一次投票的相关信息
- PeerId 代表的是一个复制组里面的一个参与角色
- StateMachine 当数据提交到Node之后,会执行其
onApply
方法
另外Node中有几个比较重要的定时器:
- electionTimer 选举定时器,如果当前leader挂了,会进行preVote
- voteTimer 投票定时器,当投票超时后,会进行preVote
- stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp
选主投票
JRaft的选举投票有两个步骤preVote
和vote
,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
而preVote
则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。
JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init
的时候,会开启electionTimer
:
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name))
protected void onTrigger()
handleElectionTimeout();
protected int adjustTimeout(final int timeoutMs)
return randomTimeout(timeoutMs);
;
private void handleElectionTimeout()
boolean doUnlock = true;
this.writeLock.lock();
try
if (this.state != State.STATE_FOLLOWER)
return;
if (isCurrentLeaderValid())
return;
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
this.leaderId));
// Judge whether to launch a election.
if (!allowLaunchElection())
return;
doUnlock = false;
preVote();
finally
if (doUnlock)
this.writeLock.unlock();
而handleElectionTimeout
中主要就是进行了preVote
操作,这里JRaft一次投票的主要几个操作如下:
preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest
我们首先看下preVote
:
private void preVote()
.....
final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true;
this.writeLock.lock();
try
// pre_vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm)
LOG.warn("Node raise term when get lastLogId.", getNodeId(), this.currTerm);
return;
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers())
if (peer.equals(this.serverId))
continue;
if (!this.rpcService.connect(peer.getEndpoint()))
LOG.warn("Node channel init failed, address=.", getNodeId(), peer.getEndpoint());
continue;
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted())
doUnlock = false;
electSelf();
finally
if (doUnlock)
this.writeLock.unlock();
可以看到preVote
中会对当前出自己以外的节点发送RequestVoteRequest
请求,主要设置信息如下:
RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
可以看到,这时候并没有将自己的currTerm
设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++
我们看下其他节点收到这个请求是怎么处理的:
public Message handlePreVoteRequest(final RequestVoteRequest request)
boolean doUnlock = true;
this.writeLock.lock();
try
if (!this.state.isActive())
LOG.warn("Node is not in active state, currTerm=.", getNodeId(), this.currTerm);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId()))
LOG.warn("Node received PreVoteRequest from serverId bad format.", getNodeId(),
request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
boolean granted = false;
// noinspection ConstantConditions
do
if (!this.conf.contains(candidateId))
LOG.warn("Node ignore PreVoteRequest from as it is not in conf <>.", getNodeId(),
request.getServerId(), this.conf);
break;
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid())
LOG.info(
"Node ignore PreVoteRequest from , term=, currTerm=, because the leader 's lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
if (request.getTerm() < this.currTerm)
LOG.info("Node ignore PreVoteRequest from , term=, currTerm=.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// A follower replicator may not be started when this node become leader, so we must check it.
checkReplicator(candidateId);
break;
// A follower replicator may not be started when this node become leader, so we must check it.
// check replicator state
checkReplicator(candidateId);
doUnlock = false;
this.writeLock.unlock();
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info(
"Node received PreVoteRequest from , term=, currTerm=, granted=, requestLastLogId=, lastLogId=.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
while (false);
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(granted) //
.build();
finally
if (doUnlock)
this.writeLock.unlock();
这里其他节点收到PreVoteRequest
的时候,会进行如下判断:
- 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
- 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
- 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
- 如果上面都不满足,返回granted=true
这是其他节点收到PreVoteRequest
的处理,我们再看发起preVote
节点收到其他节点的响应是怎么处理的:
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response)
boolean doUnlock = true;
this.writeLock.lock();
try
if (this.state != State.STATE_FOLLOWER)
LOG.warn("Node received invalid PreVoteResponse from , state not in STATE_FOLLOWER but .",
getNodeId(), peerId, this.state);
return;
if (term != this.currTerm)
LOG.warn("Node received invalid PreVoteResponse from , term=, currTerm=.", getNodeId(),
peerId, term, this.currTerm);
return;
if (response.getTerm() > this.currTerm)
LOG.warn("Node received invalid PreVoteResponse from , term , expect=.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
LOG.info("Node received PreVoteResponse from , term=, granted=.", getNodeId(), peerId,
response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted())
this.prevVoteCtx.grant(peerId);
if (this.prevVoteCtx.isGranted())
doUnlock = false;
electSelf();
finally
if (doUnlock)
this.writeLock.unlock();
这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息
,我们看下其判断逻辑:
- 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
- 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
- 如果响应允许这次投票,即
response.getGranted=true
,判断本轮发起的投票同意是否过半。
在NodeImpl中有两个Ballot
,一个是支持preVote
的prevVoteCtx,一个是支持vote
,在发起preVote的时候,会对prevVoteCtx进行初始化:
public boolean init(final Configuration conf, final Configuration oldConf)
this.peers.clear();
this.oldPeers.clear();
this.quorum = this.oldQuorum = 0;
int index = 0;
if (conf != null)
for (final PeerId peer : conf)
this.peers.add(new UnfoundPeerId(peer, index++, false));
this.quorum = this.peers.size() / 2 + 1;
if (oldConf == null)
return true;
index = 0;
for (final PeerId peer : oldConf)
this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
this.oldQuorum = this.oldPeers.size() / 2 + 1;
return true;
可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
这里我们在来看当preVote
请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse
的最后,会执行this.prevVoteCtx.grant(peerId);
:
public void grant(final PeerId peerId)
grant(peerId, new PosHint());
public PosHint grant(final PeerId peerId, final PosHint hint)
UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
if (peer != null)
if (!peer.found)
peer.found = true;
以上是关于Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制的主要内容,如果未能解决你的问题,请参考以下文章
剖析 | 蚂蚁金服生产级 Raft 算法 SOFAJRaft 合辑