Flink HA
Posted andyhe
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink HA相关的知识,希望对你有一定的参考价值。
standalone 模式的高可用
部署
flink 使用zookeeper协调多个运行的jobmanager,所以要启用flink HA 你需要把高可用模式设置成zookeeper
,配置zookeeper相关参数,并且在masters配置文件中配置所有的jobmanager主机地址和web UI 端口
在一下例子中,我们配置node1,node2,node3三个jobmanager
- 编辑
conf/masters
node1:8081 node2:8081 node3:8081
编辑
conf/flink-conf.yaml
high-availability: zookeeper high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one high-availability.storageDir: hdfs:///flink/recovery
启动集群
bin/start-cluster.sh
yarn 模式的高可用?
yarn 模式中不会同时运行多个jobmanager(ApplicationMaster) instances,而是只运行一个,如果ApplicationMaster异常会依靠Yarn机制进行重启.
部署
编辑
yarn-site.xml
添加如下配置<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. </description> </property>
设置application master 最大重启次数
编辑
conf/flink-conf.yaml
high-availability: zookeeper high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181 high-availability.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink yarn.application-attempts: 10
配置HA模式为
zookeeper
,并设置应用的最大重启次数启动一个yarn session
bin/yarn-session.sh -n 2
原理
采用curator
库中的LeaderLatch
实现leader选举。不了解的同学可以移步curator相关文档LeaderLatch
在zookeeper中生成的主要目录结构如下图:
涉及到的主要类:
选举:
首先我们看一下
JobManager
的构造函数:
注意它的构造函数需要
LeaderElectionService
对象作为参数以及它本身实现了LeaderContender
接口。那么LeaderElectionService
是怎么创建的?其实就是根据high-availability: zookeeper
此配置项,由HighAvailabilityServicesUtils
工具类的createAvailableOrEmbeddedServices
方法创建HighAvailabilityServices
对象然后通过其getJobManagerLeaderElectionService
方法创建:public static HighAvailabilityServices createAvailableOrEmbeddedServices( Configuration config, Executor executor) throws Exception { HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config); switch (highAvailabilityMode) { case NONE: return new EmbeddedHaServices(executor); case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(config), executor, config, blobStoreService); default: throw new Exception("High availability mode " + highAvailabilityMode + " is not supported."); } }
public static ZooKeeperLeaderElectionService createLeaderElectionService( final CuratorFramework client, final Configuration configuration, final String pathSuffix) { final String latchPath = configuration.getString( HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix; final String leaderPath = configuration.getString( HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); }
我们再了解一下
ZooKeeperLeaderElectionService
以及
LeaderContender
ZooKeeperLeaderElectionService
类主要管理namespace
下的两个路径,即latchPath
(/leaderlatch)和leaderPath(/leader)
,latchPath
用来进行leader选举,leaderPath
存储选举出的leader的地址和UUID。LeaderContender
类用于当leader发生改变时,收到相应的通知以进行相关业务处理。比如:自己变成leader时,进行job恢复;当自己被撤销leader时,断开注册的TaskManager。在
JobManager
的preStart
方法中会调用ZooKeeperLeaderElectionService
的start
方法注册LeaderLatch
(/leaderlatch)和NodeCache
(/leader)的监听器。如果某个LeaderLatch
被选为leader,则对应的ZooKeeperLeaderElectionService
对象的isLeader
方法会被回调,从而调用LeaderContender->grantLeadership()
通知被选中的竞选者(此处为JobManager),然后JobManager
会调用LeaderElectionService->confirmLeaderSessionID()
把被选中的leader的相关信息写入到/leader
目录下,并异步进行job恢复工作。
NodeCache
(/leader)的监听器监听写入数据的变化,并具备纠错功能。public void isLeader() { synchronized (lock) { if (running) { issuedLeaderSessionID = UUID.randomUUID(); confirmedLeaderSessionID = null; if (LOG.isDebugEnabled()) { LOG.debug( "Grant leadership to contender {} with session ID {}.", leaderContender.getAddress(), issuedLeaderSessionID); } leaderContender.grantLeadership(issuedLeaderSessionID); } else { LOG.debug("Ignoring the grant leadership notification since the service has " + "already been stopped."); } } }
leader重新选举后需要恢复提交的Job以及恢复相应job的checkpoint。 这就涉及到
JobManager
构造函数图示中圈红的SubmittedJobGraphStore
和CheckpointRecoveryFactory
这两个类,我们后边专门进行详细讲解。查询:
ZooKeeperLeaderRetrievalService
通过监听/flink/{cluster_id}/leader/{default_jobid}/job_manager_lock
目录的变化,读取该目录下的数据然后通过LeaderRetrievalListener
的notifyLeaderAddress
方法通知实现该接口的对象。比如更新FlinkMiniCluster
的leaderGateway
以上是关于Flink HA的主要内容,如果未能解决你的问题,请参考以下文章