Flink HA

Posted andyhe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink HA相关的知识,希望对你有一定的参考价值。

standalone 模式的高可用

技术分享图片

部署

flink 使用zookeeper协调多个运行的jobmanager,所以要启用flink HA 你需要把高可用模式设置成zookeeper,配置zookeeper相关参数,并且在masters配置文件中配置所有的jobmanager主机地址和web UI 端口

在一下例子中,我们配置node1,node2,node3三个jobmanager

  1. 编辑conf/masters
    node1:8081 node2:8081 node3:8081
  2. 编辑conf/flink-conf.yaml

    high-availability: zookeeper
    high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.cluster-id: /cluster_one  high-availability.storageDir: hdfs:///flink/recovery
  3. 启动集群

    bin/start-cluster.sh

yarn 模式的高可用?

yarn 模式中不会同时运行多个jobmanager(ApplicationMaster) instances,而是只运行一个,如果ApplicationMaster异常会依靠Yarn机制进行重启.

部署

  1. 编辑yarn-site.xml添加如下配置

    <property>
        <name>yarn.resourcemanager.am.max-attempts</name>
        <value>4</value>
        <description>
        The maximum number of application master execution attempts.
        </description>
    </property>

    设置application master 最大重启次数

  2. 编辑conf/flink-conf.yaml

    high-availability: zookeeper
    high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
    high-availability.storageDir: hdfs:///flink/recovery
    high-availability.zookeeper.path.root: /flink
    yarn.application-attempts: 10

    配置HA模式为zookeeper,并设置应用的最大重启次数

  3. 启动一个yarn session

    bin/yarn-session.sh -n 2

原理

采用curator库中的LeaderLatch实现leader选举。不了解的同学可以移步curator相关文档LeaderLatch
在zookeeper中生成的主要目录结构如下图:
技术分享图片

涉及到的主要类:

  • 选举:

    首先我们看一下JobManager的构造函数:
    技术分享图片

    注意它的构造函数需要LeaderElectionService对象作为参数以及它本身实现了LeaderContender接口。那么LeaderElectionService是怎么创建的?其实就是根据high-availability: zookeeper此配置项,由HighAvailabilityServicesUtils工具类的createAvailableOrEmbeddedServices方法创建HighAvailabilityServices对象然后通过其getJobManagerLeaderElectionService方法创建:

    public static HighAvailabilityServices createAvailableOrEmbeddedServices(
        Configuration config,
        Executor executor) throws Exception {
        HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);
    
        switch (highAvailabilityMode) {
            case NONE:
                return new EmbeddedHaServices(executor);
    
            case ZOOKEEPER:
                BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
    
                return new ZooKeeperHaServices(
                    ZooKeeperUtils.startCuratorFramework(config),
                    executor,
                    config,
                    blobStoreService);
    
            default:
                throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
        }
    }
    public static ZooKeeperLeaderElectionService createLeaderElectionService(
        final CuratorFramework client,
        final Configuration configuration,
        final String pathSuffix)
    {
        final String latchPath = configuration.getString(
            HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
        final String leaderPath = configuration.getString(
            HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
    
        return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
    }

    我们再了解一下ZooKeeperLeaderElectionService
    技术分享图片

    以及LeaderContender
    技术分享图片

    ZooKeeperLeaderElectionService类主要管理namespace下的两个路径,即latchPath(/leaderlatch)和leaderPath(/leader),latchPath用来进行leader选举,leaderPath存储选举出的leader的地址和UUID。

    LeaderContender类用于当leader发生改变时,收到相应的通知以进行相关业务处理。比如:自己变成leader时,进行job恢复;当自己被撤销leader时,断开注册的TaskManager。

    JobManagerpreStart方法中会调用ZooKeeperLeaderElectionServicestart方法注册LeaderLatch(/leaderlatch)和NodeCache(/leader)的监听器。如果某个LeaderLatch被选为leader,则对应的ZooKeeperLeaderElectionService对象的isLeader方法会被回调,从而调用LeaderContender->grantLeadership()通知被选中的竞选者(此处为JobManager),然后JobManager会调用LeaderElectionService->confirmLeaderSessionID()把被选中的leader的相关信息写入到/leader目录下,并异步进行job恢复工作。
    NodeCache(/leader)的监听器监听写入数据的变化,并具备纠错功能。

    public void isLeader() {
        synchronized (lock) {
            if (running) {
                issuedLeaderSessionID = UUID.randomUUID();
                confirmedLeaderSessionID = null;
    
                if (LOG.isDebugEnabled()) {
                    LOG.debug(
                        "Grant leadership to contender {} with session ID {}.",
                        leaderContender.getAddress(),
                        issuedLeaderSessionID);
                }
    
                leaderContender.grantLeadership(issuedLeaderSessionID);
            } else {
                LOG.debug("Ignoring the grant leadership notification since the service has " +
                    "already been stopped.");
            }
        }
    }

    leader重新选举后需要恢复提交的Job以及恢复相应job的checkpoint。 这就涉及到JobManager构造函数图示中圈红的SubmittedJobGraphStoreCheckpointRecoveryFactory这两个类,我们后边专门进行详细讲解。

  • 查询:
    技术分享图片

    ZooKeeperLeaderRetrievalService通过监听/flink/{cluster_id}/leader/{default_jobid}/job_manager_lock目录的变化,读取该目录下的数据然后通过LeaderRetrievalListenernotifyLeaderAddress方法通知实现该接口的对象。比如更新FlinkMiniClusterleaderGateway

以上是关于Flink HA的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.2 没有在 HA Cluster 模式下启动

Flink HA

flink-1.12.2 ha 集群配置

flink-1.12.2 ha 集群配置

Flink的standAlone模式的HA环境

搭建高可用的flink JobManager HA