Raft协议之领导者选举
Posted 技术和生活小站
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Raft协议之领导者选举相关的知识,希望对你有一定的参考价值。
pic by mthrworld from Instagram
Raft是一种基于日志复制的分布式一致性算法,它提供了和Paxos算法相同的功能和性能,但更容易理解并且更容易构建实际的系统。Raft将一致性算法分解成了几个关键模块,包括领导者选举、日志复制和安全性。
Raft算法定义了Follower、Candidate和Leader三个角色,服务节点可以在这三个状态进行切换。
Raft算法最著名的实现是etcd/raft和hashicorp/raft,前者是etcd的实现基础,后者是consul的实现基础。它们都是用Go语言实现的,更多语言实现请参见 https://raft.github.io。
copycat是raft的一个Java实现,实现了领导者选举、日志复制、角色切换和日志压缩等功能。
下面结合源码分析一下copycat对领导者选举的实现过程。
通过CopycatServer.builder()方法构造CopycatServer实例,再通过bootstrap()方法启动。
CopycatServer.Builder builder = CopycatServer.builder(member.clientAddress(), member.serverAddress())
.withType(member.type())
.withTransport(new NettyTransport())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(new File(String.format("target/fuzz-logs/%d", member.address().hashCode())))
.withMaxSegmentSize(randomNumber(1024 * 1024 * 7) + (1024 * 1024))
.withMaxEntriesPerSegment(randomNumber(10000) + 1000)
.withCompactionThreads(randomNumber(4) + 1)
.withCompactionThreshold(Math.random() / (double) 2)
.withEntryBufferSize(randomNumber(10000) + 1)
.withFlushOnCommit(randomBoolean())
.withMinorCompactionInterval(Duration.ofSeconds(randomNumber(30) + 15))
.withMajorCompactionInterval(Duration.ofSeconds(randomNumber(60) + 60))
.build())
.withStateMachine(FuzzStateMachine::new);
CopycatServer server = builder.build();
server.serializer().disableWhitelist();
server.bootstrap(member.serverAddress);
上述bootstrap()方法会调用ClusterState类的bootstrap()方法,注意所有ServerMember的type是ACTIVE类型的。
// Create a set of cluster members, excluding the local member which is joining a cluster.
Set<Member> activeMembers = cluster.stream()
.filter(m -> !m.equals(member.serverAddress()))
.map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
.collect(Collectors.toSet());
ClusterState的join()方法里继续调用:
// Transition the server to the appropriate state for the local member type.
context.transition(member.type());
这里的context是ServerContext类的实例,member.type()是ACTIVE。transition()方法实现如下:
protected void transition(Member.Type type) {
switch (type) {
case ACTIVE:
if (!(state instanceof ActiveState)) {
transition(CopycatServer.State.FOLLOWER);
}
break;
case PASSIVE:
if (this.state.type() != CopycatServer.State.PASSIVE) {
transition(CopycatServer.State.PASSIVE);
}
break;
case RESERVE:
if (this.state.type() != CopycatServer.State.RESERVE) {
transition(CopycatServer.State.RESERVE);
}
break;
default:
if (this.state.type() != CopycatServer.State.INACTIVE) {
transition(CopycatServer.State.INACTIVE);
}
break;
}
}
/**
* Transition handler.
*/
public void transition(CopycatServer.State state) {
......
this.state = createState(state);
this.state.open().get();
. .....
}
/**
* Creates an internal state for the given state type.
*/
private AbstractState createState(CopycatServer.State state) {
switch (state) {
case INACTIVE:
return new InactiveState(this);
case RESERVE:
return new ReserveState(this);
case PASSIVE:
return new PassiveState(this);
case FOLLOWER:
return new FollowerState(this);
case CANDIDATE:
return new CandidateState(this);
case LEADER:
return new LeaderState(this);
default:
throw new AssertionError();
}
}
ACTIVE类型下对应FOLLOWER的状态,通过createState()创建了FollowerState对象,然后调用FollowerState对象的open()方法。FollowerState会调用sendPollRequest()方法去集群上检查所有成员的状态,并决定该CopycatServer是否切换到Candidate角色。
此时集群还没有其它的成员,第一个CopycatServer将自动切换到Candidata角色,调用context.transition(CopycatServer.State.CANDIDATE)方法。
通过transition(CANDIDATE)生成了CandidataState对象,再通过open()调用sendVoteRequests()方法发起投票。
该候选人会先把自己的任期号term加1。
// When the election timer is reset, increment the current term and restart the election.
context.setTerm(context.getTerm() + 1).setLastVotedFor(context.getCluster().member().id());
如果集群还没有其它成员,则该候选人自动成为leader。
// If there are no other members in the cluster, immediately transition to leader.
if (votingMembers.isEmpty()) {
LOGGER.trace("{} - Single member cluster. Transitioning directly to leader.", context.getCluster().member().address());
context.transition(CopycatServer.State.LEADER);
return;
}
如果集群里有其它成员,则向集群所有成员发送投票请求。
// Send vote requests to all nodes. The vote request that is sent
// to this node will be automatically successful.
// First check if the quorum is null. If the quorum isn't null then that
// indicates that another vote is already going on.
final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> {
complete.set(true);
if (elected) {
context.transition(CopycatServer.State.LEADER);
} else {
context.transition(CopycatServer.State.FOLLOWER);
}
});
// Once we got the last log term, iterate through each current member
// of the cluster and vote each member for a vote.
for (ServerMember member : votingMembers) {
LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm());
VoteRequest request = VoteRequest.builder()
.withTerm(context.getTerm())
.withCandidate(context.getCluster().member().id())
.withLogIndex(lastIndex)
.withLogTerm(lastTerm)
.build();
context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> {
connection.<VoteRequest, VoteResponse>sendAndReceive(request).whenCompleteAsync((response, error) -> {
context.checkThread();
if (isOpen() && !complete.get()) {
if (error != null) {
LOGGER.warn(error.getMessage());
quorum.fail();
} else {
if (response.term() > context.getTerm()) {
LOGGER.trace("{} - Received greater term from {}", context.getCluster().member().address(), member);
context.setTerm(response.term());
complete.set(true);
context.transition(CopycatServer.State.FOLLOWER);
} else if (!response.voted()) {
LOGGER.trace("{} - Received rejected vote from {}", context.getCluster().member().address(), member);
quorum.fail();
} else if (response.term() != context.getTerm()) {
LOGGER.trace("{} - Received successful vote for a different term from {}", context.getCluster().member().address(), member);
quorum.fail();
} else {
LOGGER.trace("{} - Received successful vote from {}", context.getCluster().member().address(), member);
quorum.succeed();
}
}
}
}, context.getThreadContext().executor());
});
}
如果该候选人赢得了选举,则通过context.transition(CopycatServer.State.LEADER)成为领导者,否则继续回退到FOLLOWER角色。
当一个候选人从整个集群的大多数服务器节点获得了针对同一个任期号的选票,那么他就赢得了这次选举并成为领导人。每一个服务器最多会对一个任期号投出一张选票,按照先来先服务的原则。要求大多数选票的规则确保了最多只会有一个候选人赢得此次选举。一旦候选人赢得选举,他就立即成为领导人。然后他会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生。
在等待投票的时候,候选人可能会从其他的服务器接收到声明它是领导人的附加日志项RPC。如果这个领导人的任期号(包含在此次的RPC中)不小于候选人当前的任期号,那么候选人会承认领导人合法并回到跟随者状态。如果此次RPC中的任期号比自己小,那么候选人就会拒绝这次的RPC并且继续保持候选人状态。
有一种可能的结果是候选人既没有赢得选举也没有输:如果有多个跟随者同时成为候选人,那么选票可能会被瓜分以至于没有候选人可以赢得大多数人的支持。当这种情况发生的时候,每一个候选人都会超时,然后通过增加当前任期号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。
Raft算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,选举超时时间是从一个固定的区间(例如 150-300 毫秒)随机选择。这样可以把服务器都分散开以至于在大多数情况下只有一个服务器会选举超时;然后他赢得选举并在其他服务器超时之前发送心跳包。同样的机制被用在选票瓜分的情况下。每一个候选人在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。
当候选人成为领导者时,会创建LeaderState实例,并调用open方法,写入InitializeEntry日志,然后调用context.getStateMachine().apply(resultIndex)在状态机中应用该状态。最后通过startAppendTimer()循环调用LeaderAppender类的appendMembers()方法,告诉其它节点“我是领导者”。
其它节点收到消息后,则以Follower身份不断接收Leader的消息,进行日志同步复制,Copycat的领导者选举过程到此结束。
关于raft算法的更多细节,可以参考 https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md 这篇文章。
后面再分析日志同步和日志压缩过程。点击“阅读原文”可以查看更多文章。
以上是关于Raft协议之领导者选举的主要内容,如果未能解决你的问题,请参考以下文章