Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制相关的知识,希望对你有一定的参考价值。

关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

这里我们说下阿里开源的sofa-jraft的实现。
首先说明下,在sofa-jraft有几个比较重要的角色

  • Node 代表的就是一个服务节点
  • Ballot 代表的是一次投票的相关信息
  • PeerId 代表的是一个复制组里面的一个参与角色
  • StateMachine 当数据提交到Node之后,会执行其onApply方法

另外Node中有几个比较重要的定时器:

  • electionTimer 选举定时器,如果当前leader挂了,会进行preVote
  • voteTimer 投票定时器,当投票超时后,会进行preVote
  • stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp

选主投票

JRaft的选举投票有两个步骤preVotevote,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
preVote则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。

JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init的时候,会开启electionTimer:

this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),
            TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) 
            protected void onTrigger() 
                handleElectionTimeout();
            
            protected int adjustTimeout(final int timeoutMs) 
                return randomTimeout(timeoutMs);
            
        ;
  private void handleElectionTimeout() 
        boolean doUnlock = true;
        this.writeLock.lock();
        try 
            if (this.state != State.STATE_FOLLOWER) 
                return;
            
            if (isCurrentLeaderValid()) 
                return;
            
            resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
                this.leaderId));

            // Judge whether to launch a election.
            if (!allowLaunchElection()) 
                return;
            

            doUnlock = false;
            preVote();

         finally 
            if (doUnlock) 
                this.writeLock.unlock();
            
        
    

handleElectionTimeout中主要就是进行了preVote操作,这里JRaft一次投票的主要几个操作如下:

preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest

我们首先看下preVote:

private void preVote() 
     .....
        final LogId lastLogId = this.logManager.getLastLogId(true);
        boolean doUnlock = true;
        this.writeLock.lock();
        try 
            // pre_vote need defense ABA after unlock&writeLock
            if (oldTerm != this.currTerm) 
                LOG.warn("Node  raise term  when get lastLogId.", getNodeId(), this.currTerm);
                return;
            
            this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
            for (final PeerId peer : this.conf.listPeers()) 
                if (peer.equals(this.serverId)) 
                    continue;
                
                if (!this.rpcService.connect(peer.getEndpoint())) 
                    LOG.warn("Node  channel init failed, address=.", getNodeId(), peer.getEndpoint());
                    continue;
                
                final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
                done.request = RequestVoteRequest.newBuilder() //
                    .setPreVote(true) // it's a pre-vote request.
                    .setGroupId(this.groupId) //
                    .setServerId(this.serverId.toString()) //
                    .setPeerId(peer.toString()) //
                    .setTerm(this.currTerm + 1) // next term
                    .setLastLogIndex(lastLogId.getIndex()) //
                    .setLastLogTerm(lastLogId.getTerm()) //
                    .build();
                this.rpcService.preVote(peer.getEndpoint(), done.request, done);
            
            this.prevVoteCtx.grant(this.serverId);
            if (this.prevVoteCtx.isGranted()) 
                doUnlock = false;
                electSelf();
            
         finally 
            if (doUnlock) 
                this.writeLock.unlock();
            
        
    

可以看到preVote中会对当前出自己以外的节点发送RequestVoteRequest请求,主要设置信息如下:

RequestVoteRequest.newBuilder() //
                    .setPreVote(true) // it's a pre-vote request.
                    .setGroupId(this.groupId) //
                    .setServerId(this.serverId.toString()) //
                    .setPeerId(peer.toString()) //
                    .setTerm(this.currTerm + 1) // next term
                    .setLastLogIndex(lastLogId.getIndex()) //
                    .setLastLogTerm(lastLogId.getTerm()) //
                    .build();

可以看到,这时候并没有将自己的currTerm设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++

我们看下其他节点收到这个请求是怎么处理的:

public Message handlePreVoteRequest(final RequestVoteRequest request) 
        boolean doUnlock = true;
        this.writeLock.lock();
        try 
            if (!this.state.isActive()) 
                LOG.warn("Node  is not in active state, currTerm=.", getNodeId(), this.currTerm);
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Node %s is not in active state, state %s.", getNodeId(), this.state.name());
            
            final PeerId candidateId = new PeerId();
            if (!candidateId.parse(request.getServerId())) 
                LOG.warn("Node  received PreVoteRequest from  serverId bad format.", getNodeId(),
                    request.getServerId());
                return RpcFactoryHelper //
                    .responseFactory() //
                    .newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
                        "Parse candidateId failed: %s.", request.getServerId());
            
            boolean granted = false;
            // noinspection ConstantConditions
            do 
                if (!this.conf.contains(candidateId)) 
                    LOG.warn("Node  ignore PreVoteRequest from  as it is not in conf <>.", getNodeId(),
                        request.getServerId(), this.conf);
                    break;
                
                if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) 
                    LOG.info(
                        "Node  ignore PreVoteRequest from , term=, currTerm=, because the leader 's lease is still valid.",
                        getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
                    break;
                
                if (request.getTerm() < this.currTerm) 
                    LOG.info("Node  ignore PreVoteRequest from , term=, currTerm=.", getNodeId(),
                        request.getServerId(), request.getTerm(), this.currTerm);
                    // A follower replicator may not be started when this node become leader, so we must check it.
                    checkReplicator(candidateId);
                    break;
                
                // A follower replicator may not be started when this node become leader, so we must check it.
                // check replicator state
                checkReplicator(candidateId);

                doUnlock = false;
                this.writeLock.unlock();

                final LogId lastLogId = this.logManager.getLastLogId(true);

                doUnlock = true;
                this.writeLock.lock();
                final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
                granted = requestLastLogId.compareTo(lastLogId) >= 0;

                LOG.info(
                    "Node  received PreVoteRequest from , term=, currTerm=, granted=, requestLastLogId=, lastLogId=.",
                    getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
                    lastLogId);
             while (false);

            return RequestVoteResponse.newBuilder() //
                .setTerm(this.currTerm) //
                .setGranted(granted) //
                .build();
         finally 
            if (doUnlock) 
                this.writeLock.unlock();
            
        
    

这里其他节点收到PreVoteRequest的时候,会进行如下判断:

  1. 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
  2. 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
  3. 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
  4. 如果上面都不满足,返回granted=true

这是其他节点收到PreVoteRequest的处理,我们再看发起preVote节点收到其他节点的响应是怎么处理的:

public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) 
        boolean doUnlock = true;
        this.writeLock.lock();
        try 
            if (this.state != State.STATE_FOLLOWER) 
                LOG.warn("Node  received invalid PreVoteResponse from , state not in STATE_FOLLOWER but .",
                    getNodeId(), peerId, this.state);
                return;
            
            if (term != this.currTerm) 
                LOG.warn("Node  received invalid PreVoteResponse from , term=, currTerm=.", getNodeId(),
                    peerId, term, this.currTerm);
                return;
            
            if (response.getTerm() > this.currTerm) 
                LOG.warn("Node  received invalid PreVoteResponse from , term , expect=.", getNodeId(), peerId,
                    response.getTerm(), this.currTerm);
                stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                    "Raft node receives higher term pre_vote_response."));
                return;
            
            LOG.info("Node  received PreVoteResponse from , term=, granted=.", getNodeId(), peerId,
                response.getTerm(), response.getGranted());
            // check granted quorum?
            if (response.getGranted()) 
                this.prevVoteCtx.grant(peerId);
                if (this.prevVoteCtx.isGranted()) 
                    doUnlock = false;
                    electSelf();
                
            
         finally 
            if (doUnlock) 
                this.writeLock.unlock();
            
        
    

这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息,我们看下其判断逻辑:

  1. 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
  2. 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
  3. 如果响应允许这次投票,即response.getGranted=true,判断本轮发起的投票同意是否过半。

在NodeImpl中有两个Ballot,一个是支持preVote的prevVoteCtx,一个是支持vote,在发起preVote的时候,会对prevVoteCtx进行初始化:

public boolean init(final Configuration conf, final Configuration oldConf) 
        this.peers.clear();
        this.oldPeers.clear();
        this.quorum = this.oldQuorum = 0;
        int index = 0;
        if (conf != null) 
            for (final PeerId peer : conf) 
                this.peers.add(new UnfoundPeerId(peer, index++, false));
            
        

        this.quorum = this.peers.size() / 2 + 1;
        if (oldConf == null) 
            return true;
        
        index = 0;
        for (final PeerId peer : oldConf) 
            this.oldPeers.add(new UnfoundPeerId(peer, index++, false));
        

        this.oldQuorum = this.oldPeers.size() / 2 + 1;
        return true;
    

可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
这里我们在来看当preVote请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse的最后,会执行this.prevVoteCtx.grant(peerId);

public void grant(final PeerId peerId) 
        grant(peerId, new PosHint());
    
public PosHint grant(final PeerId peerId, final PosHint hint) 
        UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
        if (peer != null) 
            if (!peer.found) 
                peer.found = true;
                以上是关于Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制的主要内容,如果未能解决你的问题,请参考以下文章

剖析 | 蚂蚁金服生产级 Raft 算法 SOFAJRaft 合辑

raft算法浅析

Raft实战——选主

Raft实战系列,什么是选主?为什么需要选主?如何进行选主?

常见选主机制(zab、raft)

阿里技面之raft如何选主