Flink1.15源码解析--选举

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--选举相关的知识,希望对你有一定的参考价值。

文章目录

角色说明
LeaderContender(竞选者)需要选主的主体,比如dispatcher、resource manager。当选leader或者回收leader时进行回调
LeaderElectionService(竞选服务)负责选举的服务。a.关联leaderElectionDriver实现,具体驱动实现可以是zk、k8s等;在启动时初始化驱动类;b.实现LeaderElectionEventHandler调用
LeaderElectionDriver(竞选服务驱动)选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现LeaderLatchListener回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler
LeaderElectionEventHandler(竞选服务的事件处理类)leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap
LeaderRetrievalService(leader监听服务)监听leader的变化,传递给监听者。实现与LeaderElectionService类似,但作用不同。主要是接收Leader信息然后通知监听者LeaderRetrievalListener,监听者
LeaderRetriever(监听者)监听leader的监听者,实现了LeaderRetrievalListener接口。具体实现为RpcGatewayRetriever。RpcGatewayRetriever在回调中可以获取到leader的信息,创建akka的连接,生成aop代理类实例
RpcGatewayRcp网关,实现类通过AOP的方式封装了akka层的细节,可以直接调用实现类方法实现akka通信

一、LeaderContender

其中 LeaderContender 接口主要在 leader 选举中使用,代表了参与leader竞争的角色
其实现类有

  • JobMasterServiceLeadershipRunner
  • ResourceManager
  • DefaultDispatcherRunner
  • WebMonitorEndpoint

该接口中包含了两个重要的方法:

  1.  grantLeadership,表示leader竞选成功的回调方法

  2.  revokeLeadership,表示由leader变为非leader的回调方法

一个 服务需要进行选举, 在启动时,将自身作为竞争者,传递给了 leaderElectionService。

    @Override

    public void start() throws Exception 

        LOG.debug("Start leadership runner for job .", getJobID());

        leaderElectionService.start(this);

    

二、LeaderElectionService

Leader选举服务 ,以其子类 DefaultLeaderElectionService 为例

2.1、LeaderElectionService

LeaderElectionService主要提供了参与选举竞争的接口调用以及竞争结果的查询接口:

2.2、LeaderElectionEventHandler(竞选服务的事件处理类)

leader竞选回调。被选为Leader会调用LeaderContender的grantLeadership方法进行不同实现主体的后续逻辑,比如DispatcherRunner的grantLeadership后会创建Dispatcher服务,启动DispatcherBootstrap

在开始参加 Leader 时(DefaultleaderElectionService::start ),会通过选举驱动器工厂创建一个 leaderElectionDriver,通过这个Driver工厂类,Flink 将基于 zookeeper 的 CuratorFramework 的细节,与 Flink 本身做了解耦

并将自身作为一个 LeaderElectionEventHandler 传入leaderElectionDriver。

    @Override
    public final void start(LeaderContender contender) throws Exception 
        checkNotNull(contender, "Contender must not be null.");
        Preconditions.checkState(leaderContender == null, "Contender was already set.");

        synchronized (lock) 
            running = true;
            leaderContender = contender;
            
            leaderElectionDriver =
                    leaderElectionDriverFactory.createLeaderElectionDriver(
                            this,
                            new LeaderElectionFatalErrorHandler(),
                            leaderContender.getDescription());
            LOG.info("Starting DefaultLeaderElectionService with .", leaderElectionDriver);
        
    

三 、LeaderElectionDriver

3.1、LeaderElectionDriver

LeaderElectionDriver 选主具体驱动实现。以ZooKeeperLeaderElectionDriver实现为例,其会实现 LeaderLatchListener 回调接口,在主节点变化和节点变化时得到监听,然后调用 LeaderElectionEventHandler

3.2、LeaderLatchListener


public interface LeaderLatchListener 
	// 选举成功回调方法
    void isLeader();
	
    void notLeader();

3.3、 以其 子类实现 ZooKeeperLeaderElectionDriver 为例

通过工厂创建

	// 通过工厂创建 
    @Override
    public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
            LeaderElectionEventHandler leaderEventHandler,  // leaderElectionService 对象 作为 leaderEventHandler
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception 
        return new ZooKeeperLeaderElectionDriver(
                client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
    

ZooKeeperLeaderElectionDriver 封装了 CuratorFramework 作为选举框架

    /**
     * Creates a ZooKeeperLeaderElectionDriver object.
     *
     * @param client Client which is connected to the ZooKeeper quorum
     * @param path ZooKeeper node path for the leader election
     * @param leaderElectionEventHandler Event handler for processing leader change events
     * @param fatalErrorHandler Fatal error handler
     * @param leaderContenderDescription Leader contender description
     */
    public ZooKeeperLeaderElectionDriver(
            CuratorFramework client,
            String path,
            LeaderElectionEventHandler leaderElectionEventHandler, // DefaultLeaderElectionService
            FatalErrorHandler fatalErrorHandler,
            String leaderContenderDescription)
            throws Exception 
        checkNotNull(path);
        this.client = checkNotNull(client);
        this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
        
        this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
        
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
        
        leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
        leaderLatch = new LeaderLatch(client, leaderLatchPath);
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client,
                        connectionInformationPath,
                        this::retrieveLeaderInformationFromZooKeeper);

        running = true;
		// 启动选举 
        leaderLatch.addListener(this); // 添加  LeaderLatchListener
        leaderLatch.start();

        cache.start();

        client.getConnectionStateListenable().addListener(listener);
    

3.3.1、LeaderLatch

3.3.1.1、LeaderLatch 创建

        // 创建 leaderLatch
        leaderLatch = new LeaderLatch(client, leaderLatchPath);

3.3.1.2、创建 TreeCache

设置监听
Curator 相关监听 API 封装 zookeeper 原生API,内部增加重复注册等功能,从而使监听可以重复使用。
Curator 存在三种类型 API。

  • NodeCache:针对节点增删改操作。
  • PathChildrenCache:针对节点一级目录下节点增删改监听
  • TreeCache:结合 NodeCache 与 PathChildrenCache 操作,不仅可以监听当前节点,还可以监听节点下任意子节点(支持多级)变动。
        this.cache =
                ZooKeeperUtils.createTreeCache(
                        client,
                        connectionInformationPath,
                        this::retrieveLeaderInformationFromZooKeeper);

    private void retrieveLeaderInformationFromZooKeeper() throws Exception 
        if (leaderLatch.hasLeadership()) 
            ChildData childData = cache.getCurrentData(connectionInformationPath);
            // 回调  onLeaderInformationChange  leaderElectionEvnetHandler 是 DefaultLeaderElectionService
            leaderElectionEventHandler.onLeaderInformationChange(
                    childData == null
                            ? LeaderInformation.empty()
                            : ZooKeeperUtils.readLeaderInformation(childData.getData()));
        
    

回调 onLeaderInformationChange, leaderElectionEvnetHandler 是 DefaultLeaderElectionService

// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onLeaderInformationChange

    @Override
    @GuardedBy("lock")
    public void onLeaderInformationChange(LeaderInformation leaderInformation) 
        synchronized (lock) 
            if (running) 
                if (!confirmedLeaderInformation.isEmpty()) 
                    final LeaderInformation confirmedLeaderInfo = this.confirmedLeaderInformation;
                    if (leaderInformation.isEmpty()) 
                        if (LOG.isDebugEnabled()) 
                            LOG.debug(
                                    "Writing leader information by  since the external storage is empty.",
                                    leaderContender.getDescription());
                        
                        // 主要逻辑是将 leaderinfo 写入 zookeeper
                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                     else if (!leaderInformation.equals(confirmedLeaderInfo)) 
                        // the data field does not correspond to the expected leader information
                        if (LOG.isDebugEnabled()) 
                            LOG.debug(
                                    "Correcting leader information by .",
                                    leaderContender.getDescription());
                        
                        leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                    
                
             
        
    

leaderinfo 写入 zookeeper

    /** Writes the current leader's address as well the given leader session ID to ZooKeeper. */
    @Override
    public void writeLeaderInformation(LeaderInformation leaderInformation) 
        assert (running);
        // this method does not have to be synchronized because the curator framework client
        // is thread-safe. We do not write the empty data to ZooKeeper here. Because
        // check-leadership-and-update
        // is not a transactional operation. We may wrongly clear the data written by new leader.
        if (leaderInformation.isEmpty()) 
            return;
        

        try 
            ZooKeeperUtils.writeLeaderInformationToZooKeeper(
                    leaderInformation,
                    client,
                    leaderLatch::hasLeadership,
                    connectionInformationPath);

            if (LOG.isDebugEnabled()) 
                LOG.debug("Successfully wrote leader information: .", leaderInformation);
            
         catch (Exception e) 
            fatalErrorHandler.onFatalError(
                    new LeaderElectionException(
                            "Could not write leader address and leader session ID to "
                                    + "ZooKeeper.",
                            e));
        
    

3.3.1.3、启动选举

        // 将 ZooKeeperLeaderElectionDriver 添加到  leaderLatch 的 listeners 中
        leaderLatch.addListener(this);
        leaderLatch.start();
// org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch#start
    public void start() throws Exception 
        Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
        this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() 
            public void run() 
                try 
                    LeaderLatch.this.internalStart();
                 finally 
                    LeaderLatch.this.startTask.set((Object)null);
                

            
        ));
    


public class AfterConnectionEstablished 
    private static final Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);

    public static Future<?> execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception 
        final ExecutorService executor = 
        	ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
        	
        Runnable internalCall = new Runnable() 
            public void run() 
                try 
                    client.blockUntilConnected();
                    runAfterConnection.run();
                 catch (Exception var5) 
                    ThreadUtils.checkInterrupted(var5);
                    AfterConnectionEstablished.log.error("An error occurred blocking until a connection is available", var5);
                 finally 
                    executor.shutdown();
                

            
        ;
        return executor.submit(internalCall);
    

    private AfterConnectionEstablished() 
    

调用 internalStart

    private synchronized void internalStart() 
        if (this.state.get() == LeaderLatch.State.STARTED) 
            this.client.getConnectionStateListenable().addListener(this.listener);

            try 
            // 主要逻辑
                this.reset();
             catch (Exception var2) 
                ThreadUtils.checkInterrupted(var2);
                this.log.error("An error occurred checking resetting leadership.", var2);
            
        

    

reset

异步接口
Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

    @VisibleForTesting
    void reset() throws Exception 
    	// 领导权初始化
        this.setLeadership(false);
        this.setNode((String)null);
        
        BackgroundCallback callback = new BackgroundCallback() 
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception 
                if (LeaderLatch.this.debugResetWaitLatch != null) 
                    LeaderLatch.this.debugResetWaitLatch.await();
                    LeaderLatch.this.debugResetWaitLatch = null;
                

                if (event.getResultCode() == Code.OK.intValue()) 
                    LeaderLatch.this.setNode(event.getName());
                    if (LeaderLatch.this.state.get() == LeaderLatch.State.CLOSED) 
                        LeaderLatch.this.setNode((String)null);
                     else 
                    	// 主要逻辑 主要逻辑 主要逻辑
                        LeaderLatch.this.getChildren();
                    
                 else 
                    LeaderLatch.this.log.error("getChildren() failed. rc = " + event.getResultCode());
                

            
        ;
        ((ErrorListenerPathAndBytesable)((ACLBackgroundPathAndBytesable)this.client.create()
        .creatingParentContainersIfNeeded()        	
        .withProtection()
        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL))
        .inBackground(callback))
        .forPath(ZKPaths.makePath(this.latchPath, "latch-"), LeaderSelector.getIdBytes(this.id));
    

接下来我们来看 主要逻辑 LeaderLatch.this.getChildren();

    private void getChildren() throws Exception 
        BackgroundCallback callback = new BackgroundCallback() 
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception 
                if (event.getResultCode() == Code.OK.intValue()) 
                	// 校验 领导权
                    LeaderLatch.this.checkLeadership(event.getChildren());
                

            
        ;
        ((ErrorListenerPathable)this.client.getChildren()
        .inBackground(callback))
        .forPath(ZKPaths.makePath(this.latchPath, (String)null));
    

进入 checkLeadership, 我们关注 this.setLeadership(true);

    private void checkLeadership(List<String> children) throws Exception 
        if (this.debugCheckLeaderShipLatch != null) 
            this.debugCheckLeaderShipLatch.await();
        

        final String localOurPath = (String)this.ourPath.get();
        List<String>以上是关于Flink1.15源码解析--选举的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动