zookeeper的leader选举原理和底层源码实现超级详解

Posted huisheng_qaq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper的leader选举原理和底层源码实现超级详解相关的知识,希望对你有一定的参考价值。

zookeeper选举详解

一,zookeeper选举原理

1,源码下载

在这个https://github.com/apache/zookeeper里面把源码下载即可,这里推荐版本为3.5.8

源码下载完成之后,在这个zookeeper-server的模块下面,在version包下面新建一个info的接口

其内容如下,如果会有编译报错就加入这个接口,没有的话也可以不加。

public interface Info 
     int MAJOR = 1;
     int MINOR = 0;
     int MICRO = 0;
     String QUALIFIER = null;
     int REVISION = -1 ;
     String REVISION_HASH = "1";
     String BUILD_DATE = "2022‐09‐03";

2,zookeeper集群选举流程

2.1,zookeeper集群启动以及配置加载

1,由于这个zookeeper的启动是通过一个./zkServer.sh的脚本实现整个服务的启动的,因此可以发现脚本里面主要是通过这个QuorumPeerMain 类来作为一个集群的主启动类,接下来就是主要分析一下这个类。

ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"

2,在这个类里面,有一个main的入口方法

@InterfaceAudience.Public
public class  
    public static void main(String[] args) 
        QuorumPeerMain main = new QuorumPeerMain();
        try  
            main.initializeAndRun(args);
         catch(Execption e)
            ...
        
    

3,接下来就是一个重点,主要查看这个initializeAndRun的这个方法,这个方法主要会解析一下配置文件,清理一些文件

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException 
    QuorumPeerConfig config = new QuorumPeerConfig();
    //解析配置文件,主要是解析这个zoo.cfg的配置文件
    if (args.length == 1) 
        config.parse(args[0]);
    
    //定时清理一些文件目录
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
	//如果是分布式集群环境,那么走这个逻辑
    if (args.length == 1 && config.isDistributed()) 
        runFromConfig(config);
    
    //如果是单机状态,那么走这个逻辑
    else 
        ZooKeeperServerMain.main(args);
    

4,那么可以查看这个分布式环境下的方法所走的流程,主要是查看这个runFromConfig方法,主要是会创建一个nio实例或者创建一个netty的一个工厂,将一些解析的文件存储到一个QuorumPeer的对象里面

public void runFromConfig(QuorumPeerConfig config) throws IOException,AdminServerException
	try 
		ServerCnxnFactory cnxnFactory = null;
		ServerCnxnFactory secureCnxnFactory = null;
		if (config.getClientPortAddress() != null) 
            //创建nio或者netty工厂
		    cnxnFactory = ServerCnxnFactory.createFactory();
            //监听客户端的端口
		    cnxnFactory.configure(config.getClientPortAddress(),
		            config.getMaxClientCnxns(),
		            false);
		
        
        quorumPeer = getQuorumPeer();
        //将解析出来的配置文件的值存到这个quorumPeer对象里面
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        
        //属性填充之后就进行一个初始化
        quorumPeer.initialize();
        //开启这个zookeeper的集群
        quorumPeer.start();
        quorumPeer.join();
    

5,接下来主要查看这个start启动方法,会将已有的数据从磁盘加载到内存里面

@Override
public synchronized void start() 
    if (!getView().containsKey(myid)) 
        throw new RuntimeException("My id " + myid + " not in the peer list");
     
    //加载zookeeper里面已有的数据文件
	//比如一些快照文件,需要从磁盘加载到内存里面
    loadDataBase();
    //启动刚刚初始化的这个CNX的工厂
    startServerCnxnFactory();
    try 
        adminServer.start();
     catch (AdminServerException e) 
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    
    //和选举有关的算法
    startLeaderElection();
    //选举的具体实现
    super.start();

6,结点的几个状态,当集群没有选举出来这个leader的时候,这个结点处于一个LOOKING状态;当主结点选举出来之后,这个主结点是LEADING状态,这个从结点是FOLLOWING状态;

public enum ServerState 
    LOOKING, FOLLOWING, LEADING, OBSERVING;

2.2,leader选举工作准备开始

7,接下来就是一个重点和这个选举主结点有关的方法startLeaderElection

synchronized public void startLeaderElection() 
   	try 
        //观望状态
   	    if (getPeerState() == ServerState.LOOKING) 
            //初始化一个投票对象,先给自己投一票
   	        currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
   	    
   	 catch(IOException e) 
   	    RuntimeException re = new RuntimeException(e.getMessage());
   	    re.setStackTrace(e.getStackTrace());
   	    throw re;
   	
    if (electionType == 0) 
        try 
            udpSocket = new DatagramSocket(getQuorumAddress().getPort());
            responder = new ResponderThread();
            responder.start();
         catch (SocketException e) 
            throw new RuntimeException(e);
        
    
    //快速选举的一个算法
    this.electionAlg = createElectionAlgorithm(electionType);

8,这个选举的算法如下,这个默认传进来的参数为3。因此这个默认选用的就是这个FastLeaderElection的这个算法。主要是会初始化一个选举数据的一个管理器,然后会通过一个bio的方式对这个选举进行一个监听的操作,最后会有一个选举的具体的一个算法

protected Election createElectionAlgorithm(int electionAlgorithm)
    Election le=null;
    switch (electionAlgorithm) 
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        //初始化一个QuorumCnxManager类
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) 
            oldQcm.halt();
        
        //创建一个线程
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null)
            //开启线程,底层会去绑定一个端口,通过这个bio的方式进行一个选票的逻辑
            listener.start();
            //选举的逻辑
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            
            fle.start();
            le = fle;
         else 
            LOG.error("Null listener when initializing cnx manager");
        
        break;
    default:
        assert false;
    
    return le;

9,这个算法的逻辑方法FastLeaderElection的如下,主要就是在里面创建了几个链表类型的阻塞队列。阻塞队列就是为了后面加这个消息存放在这几个阻塞队列里面。

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager)
    this.stop = false;
    this.manager = manager;
    starter(self, manager);

private void starter(QuorumPeer self, QuorumCnxManager manager) 
    this.self = self;
    proposedLeader = -1;
    proposedZxid = -1;
    sendqueue = new LinkedBlockingQueue<ToSend>();
    recvqueue = new LinkedBlockingQueue<Notification>();
    this.messenger = new Messenger(manager);

10,接下来会有一个fle.start();的这个方法,里面的主要方法实现如下

void start()
    this.wsThread.start();
    this.wrThread.start();


Messenger(QuorumCnxManager manager) 
    //发起选票的线程
    this.ws = new WorkerSender(manager);
    this.wsThread = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    this.wsThread.setDaemon(true);
    //接收选票的线程
    this.wr = new WorkerReceiver(manager);
    this.wrThread = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    this.wrThread.setDaemon(true);

11,接下来回到第五步里面的这个 super.start() 的这个方法,其run方法的主要的核心如下,就是一个具体的投票的一个业务逻辑。主要会判断当前机器的一个状态,是looking状态还是已经选举成功的状态

try 
    while (running) 
        switch (getPeerState()) 
        //默认是looking观望状态,还在找leader
        case LOOKING:
            LOG.info("LOOKING");
            if (Boolean.getBoolean("readonlymode.enabled")) 
                LOG.info("Attempting to start ReadOnlyZooKeeperServer");
                final ReadOnlyZooKeeperServer roZk =
                    new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
                Thread roZkMgr = new Thread() 
                    public void run() 
                        try 
                            sleep(Math.max(2000, tickTime));
                            if (ServerState.LOOKING.equals(getPeerState())) 
                                roZk.startup();
                            
                         catch (InterruptedException e) 
                          
                         catch (Exception e) 
                           
                        
                    
                ;
                try 
                    roZkMgr.start();
                    reconfigFlagClear();
                    if (shuttingDownLE) 
                        shuttingDownLE = false;
                        startLeaderElection();
                    
                    setCurrentVote(makeLEStrategy().lookForLeader());
                 catch (Exception e) 
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                 finally 
                    roZkMgr.interrupt();
                    roZk.shutdown();
                
             else 
                try 
                   reconfigFlagClear();
                    if (shuttingDownLE) 
                       shuttingDownLE = false;
                       startLeaderElection();
                       
                    //设置一个当前的投票,就是给自己投一票
                    setCurrentVote(makeLEStrategy().lookForLeader());
                 catch (Exception e) 
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                                        
            
            break;
        case OBSERVING:
            try 
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
             catch (Exception e) 
                LOG.warn("Unexpected exception",e );
             finally 
                observer.shutdown();
                setObserver(null);  
               updateServerState();
            
            break;
        case FOLLOWING:
            try 
               LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
             catch (Exception e) 
               LOG.warn("Unexpected exception",e);
             finally 
               follower.shutdown();
               setFollower(null);
               updateServerState();
            
            break;
        case LEADING:
            LOG.info("LEADING");
            try 
                setLeader(makeLeader(logFactory));
                leader.lead();
                setLeader(null);
             catch (Exception e) 
                LOG.warn("Unexpected exception",e);
             finally 
                if (leader != null) 
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                
                updateServerState();
            
            break;
        
        start_fle = Time.currentElapsedTime();
    

12,这个设置投票的方法如下,主要是在这个lookForLeader里面,从这一步开始,就是选票的正式开始

public Vote lookForLeader() throws 以上是关于zookeeper的leader选举原理和底层源码实现超级详解的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper的Leader选举源码解析

Zookeeper选举原理

浅谈Zookeeper集群选举Leader节点源码

浅谈Zookeeper集群选举Leader节点源码

ZOOKEEPER3.3.3源码分析对LEADER选举过程分析的纠正

8.8.ZooKeeper 原理和选举机制