zookeeper的leader选举原理和底层源码实现超级详解
Posted huisheng_qaq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper的leader选举原理和底层源码实现超级详解相关的知识,希望对你有一定的参考价值。
zookeeper选举详解
一,zookeeper选举原理
1,源码下载
在这个https://github.com/apache/zookeeper里面把源码下载即可,这里推荐版本为3.5.8
源码下载完成之后,在这个zookeeper-server的模块下面,在version包下面新建一个info的接口
其内容如下,如果会有编译报错就加入这个接口,没有的话也可以不加。
public interface Info
int MAJOR = 1;
int MINOR = 0;
int MICRO = 0;
String QUALIFIER = null;
int REVISION = -1 ;
String REVISION_HASH = "1";
String BUILD_DATE = "2022‐09‐03";
2,zookeeper集群选举流程
2.1,zookeeper集群启动以及配置加载
1,由于这个zookeeper的启动是通过一个./zkServer.sh的脚本实现整个服务的启动的,因此可以发现脚本里面主要是通过这个QuorumPeerMain 类来作为一个集群的主启动类,接下来就是主要分析一下这个类。
ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
2,在这个类里面,有一个main的入口方法
@InterfaceAudience.Public
public class
public static void main(String[] args)
QuorumPeerMain main = new QuorumPeerMain();
try
main.initializeAndRun(args);
catch(Execption e)
...
3,接下来就是一个重点,主要查看这个initializeAndRun的这个方法,这个方法主要会解析一下配置文件,清理一些文件
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException
QuorumPeerConfig config = new QuorumPeerConfig();
//解析配置文件,主要是解析这个zoo.cfg的配置文件
if (args.length == 1)
config.parse(args[0]);
//定时清理一些文件目录
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//如果是分布式集群环境,那么走这个逻辑
if (args.length == 1 && config.isDistributed())
runFromConfig(config);
//如果是单机状态,那么走这个逻辑
else
ZooKeeperServerMain.main(args);
4,那么可以查看这个分布式环境下的方法所走的流程,主要是查看这个runFromConfig方法,主要是会创建一个nio实例或者创建一个netty的一个工厂,将一些解析的文件存储到一个QuorumPeer的对象里面
public void runFromConfig(QuorumPeerConfig config) throws IOException,AdminServerException
try
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null)
//创建nio或者netty工厂
cnxnFactory = ServerCnxnFactory.createFactory();
//监听客户端的端口
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
quorumPeer = getQuorumPeer();
//将解析出来的配置文件的值存到这个quorumPeer对象里面
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
//属性填充之后就进行一个初始化
quorumPeer.initialize();
//开启这个zookeeper的集群
quorumPeer.start();
quorumPeer.join();
5,接下来主要查看这个start启动方法,会将已有的数据从磁盘加载到内存里面
@Override
public synchronized void start()
if (!getView().containsKey(myid))
throw new RuntimeException("My id " + myid + " not in the peer list");
//加载zookeeper里面已有的数据文件
//比如一些快照文件,需要从磁盘加载到内存里面
loadDataBase();
//启动刚刚初始化的这个CNX的工厂
startServerCnxnFactory();
try
adminServer.start();
catch (AdminServerException e)
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
//和选举有关的算法
startLeaderElection();
//选举的具体实现
super.start();
6,结点的几个状态,当集群没有选举出来这个leader的时候,这个结点处于一个LOOKING状态;当主结点选举出来之后,这个主结点是LEADING状态,这个从结点是FOLLOWING状态;
public enum ServerState
LOOKING, FOLLOWING, LEADING, OBSERVING;
2.2,leader选举工作准备开始
7,接下来就是一个重点和这个选举主结点有关的方法startLeaderElection
synchronized public void startLeaderElection()
try
//观望状态
if (getPeerState() == ServerState.LOOKING)
//初始化一个投票对象,先给自己投一票
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
catch(IOException e)
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
if (electionType == 0)
try
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
catch (SocketException e)
throw new RuntimeException(e);
//快速选举的一个算法
this.electionAlg = createElectionAlgorithm(electionType);
8,这个选举的算法如下,这个默认传进来的参数为3。因此这个默认选用的就是这个FastLeaderElection的这个算法。主要是会初始化一个选举数据的一个管理器,然后会通过一个bio的方式对这个选举进行一个监听的操作,最后会有一个选举的具体的一个算法
protected Election createElectionAlgorithm(int electionAlgorithm)
Election le=null;
switch (electionAlgorithm)
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//初始化一个QuorumCnxManager类
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null)
oldQcm.halt();
//创建一个线程
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null)
//开启线程,底层会去绑定一个端口,通过这个bio的方式进行一个选票的逻辑
listener.start();
//选举的逻辑
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
else
LOG.error("Null listener when initializing cnx manager");
break;
default:
assert false;
return le;
9,这个算法的逻辑方法FastLeaderElection的如下,主要就是在里面创建了几个链表类型的阻塞队列。阻塞队列就是为了后面加这个消息存放在这几个阻塞队列里面。
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager)
this.stop = false;
this.manager = manager;
starter(self, manager);
private void starter(QuorumPeer self, QuorumCnxManager manager)
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
10,接下来会有一个fle.start();的这个方法,里面的主要方法实现如下
void start()
this.wsThread.start();
this.wrThread.start();
Messenger(QuorumCnxManager manager)
//发起选票的线程
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
//接收选票的线程
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
11,接下来回到第五步里面的这个 super.start() 的这个方法,其run方法的主要的核心如下,就是一个具体的投票的一个业务逻辑。主要会判断当前机器的一个状态,是looking状态还是已经选举成功的状态
try
while (running)
switch (getPeerState())
//默认是looking观望状态,还在找leader
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled"))
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
Thread roZkMgr = new Thread()
public void run()
try
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState()))
roZk.startup();
catch (InterruptedException e)
catch (Exception e)
;
try
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE)
shuttingDownLE = false;
startLeaderElection();
setCurrentVote(makeLEStrategy().lookForLeader());
catch (Exception e)
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
finally
roZkMgr.interrupt();
roZk.shutdown();
else
try
reconfigFlagClear();
if (shuttingDownLE)
shuttingDownLE = false;
startLeaderElection();
//设置一个当前的投票,就是给自己投一票
setCurrentVote(makeLEStrategy().lookForLeader());
catch (Exception e)
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
break;
case OBSERVING:
try
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
catch (Exception e)
LOG.warn("Unexpected exception",e );
finally
observer.shutdown();
setObserver(null);
updateServerState();
break;
case FOLLOWING:
try
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
catch (Exception e)
LOG.warn("Unexpected exception",e);
finally
follower.shutdown();
setFollower(null);
updateServerState();
break;
case LEADING:
LOG.info("LEADING");
try
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
catch (Exception e)
LOG.warn("Unexpected exception",e);
finally
if (leader != null)
leader.shutdown("Forcing shutdown");
setLeader(null);
updateServerState();
break;
start_fle = Time.currentElapsedTime();
12,这个设置投票的方法如下,主要是在这个lookForLeader里面,从这一步开始,就是选票的正式开始
public Vote lookForLeader() throws 以上是关于zookeeper的leader选举原理和底层源码实现超级详解的主要内容,如果未能解决你的问题,请参考以下文章