Flink JobManager高可用性(HA)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink JobManager高可用性(HA)相关的知识,希望对你有一定的参考价值。
参考技术AJobManager 协调每个 Flink 部署。它负责调度和资源管理。
默认情况下,每个 Flink 集群只有一个 JobManager 实例。 这会产生单点故障(SPOF):如果 JobManager 崩溃,则无法提交新作业并且导致运行中的作业运行失败。
使用 JobManager 高可用性模式,可以避免这个问题,从而消除 SPOF。您可以为 Standalone 和 YARN 集群 配置高可用性。
针对 Standalone 集群的 JobManager 高可用性的一般概念是,任何时候都有一个 主 JobManager 和 多个备 JobManagers ,以便在主节点失败时有备 JobManagers 来接管集群。这保证了 没有单点故障 ,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。
例如,请考虑以下三个 JobManager 实例的设置:
要启用 JobManager 高可用性,您必须将 高可用性模式设置 为 zookeeper,配置 zookeeper quorum 将所有 JobManager 主机及其 web UI 端口写入 配置文件 。
Flink利用 ZooKeeper 在所有正在运行的 JobManager 实例之间进行分布式协调。 ZooKeeper 是独立于 Flink 的服务,通过 Leader 选举和轻量级一致状态存储提供高可靠的分布式协调。 更多关于 ZooKeeper 的信息, 请查看 ZooKeeper 的入门指南 。 Flink 包含用于 Bootstrap ZooKeeper 安装的脚本。
要启动HA集群,请在以下位置配置Master文件 conf/masters:
默认情况下,job manager选一个随机端口作为进程随机通信端口。您可以通过 high-availability.jobmanager.port 键修改此设置。此配置接受单个端口(例如50010),范围(50000-50025)或两者的组合(50010,50011,50020-50025,50050-50075)。
要启动HA集群,请将以下配置键添加到 conf/flink-conf.yaml:
每个 addressX:port 都是一个 ZooKeeper 服务器的ip及其端口,Flink 可以在指定的地址和端口访问zookeeper。
重要: 在运行 YARN 或其他群集管理器中运行时,不要手动设置此值。在这些情况下,将根据应用程序 ID 自动生成 cluster-id。 手动设置 cluster-id 会覆盖 YARN 中的自动生成的 ID。反过来,使用 -z CLI 选项指定 cluster-id 会覆盖手动配置。如果在裸机上运行多个 Flink HA 集群,则必须为每个集群手动配置单独的 cluster-id。
该storageDir 中保存了 JobManager 恢复状态所需的所有元数据。
配置 master 文件和 ZooKeeper quorum 之后,您可以使用提供的集群启动脚本。它们将启动 HA 群集。请注意,启动 Flink HA 集群前,必须启动 Zookeeper 集群 ,并确保为要启动的每个 HA 群集 配置单独的 ZooKeeper 根路径 。
在运行高可用性 YARN 集群时, 我们不会运行多个 JobManager (ApplicationMaster) 实例 ,而只运行一个,该JobManager实例失败时,YARN会将其重新启动。Yarn的具体行为取决于您使用的 YARN 版本。
在YARN 配置文件 yarn-site.xml 中,需要配置 application master 的最大重试次数:
当前 YARN 版本的默认值是2(表示允许单个JobManager失败两次)。
除了HA配置(参考上文)之外,您还必须配置最大重试次数 conf/flink-conf.yaml:
这意味着在如果程序启动失败,YARN会再重试9次(9 次重试 + 1次启动)。如果 YARN 操作需要,如果启动10次作业还失败,yarn才会将该任务的状态置为失败。如果抢占,节点硬件故障或重启,NodeManager 重新同步等操作需要,YARN继续尝试启动应用。 这些重启不计入 yarn.application-attempts 个数中,请参阅 Jian Fang 的博客文章 。重要的是要注意 yarn.resourcemanager.am.max-attempts 为yarn中程序重启上限。因此, Flink 中设置的程序尝试次数不能超过 YARN 的集群设置。
注意: Hadoop YARN 2.4.0 有一个主要的 bug (在2.5.0中修复),阻止重新启动的Application Master / Job Manager 重启容器。有关详细信息,请参阅 FLINK-4142 。我们建议,在yarn版本要等于或高于Hadoop 2.5.0 增加高可用配置。
如果 ZooKeeper 使用 Kerberos 以安全模式运行,flink-conf.yaml 根据需要覆盖以下配置:
有关 Kerberos 安全性的 Flink 配置的更多信息,请参阅 此处 。您还可以在 此处 找到关于 Flink 内部如何设置基于 kerberos 的安全性的详细信息。
如果您没有正在运行的ZooKeeper,则可以使用Flink程序附带的脚本。
这是一个 ZooKeeper 配置模板 conf/zoo.cfg。您可以为主机配置为使用 server.X 条目运行 ZooKeeper,其中 X 是每个服务器的唯一IP:
该脚本 bin/start-zookeeper-quorum.sh 将在每个配置的主机上启动 ZooKeeper 服务器。 Flink wrapper 会启动 ZooKeeper 服务,该 wraper 从 conf/zoo.cfg 中读取配置,并设置一些必需的配置项。在生产设置中,建议您使用自己安装的 ZooKeeper。
搭建高可用的flink JobManager HA
JobManager协调每个flink应用的部署,它负责执行定时任务和资源管理。
每一个Flink集群都有一个jobManager, 如果jobManager出现问题之后,将不能提交新的任务和运行新任务失败,这样会造成单点失败,所以需要构建高可用的JobMangager。
类似zookeeper一样,构建好了高可用的jobManager之后,如果其中一个出现问题之后,其他可用的jobManager将会接管任务,变为leader。不会造成flink的任务执行失败。可以在单机版和集群版构建jobManager。
下面开始构建一个单机版flink的JobManger高可用HA版。
首先需要设置SSH免密登录,因为启动的时候程序会通过远程登录访问并且启动程序。
执行命令,就可以免密登录自己的机器了。如果不进行免密登录的话,那么启动的hadoop的时候会报 "start port 22 connection refused"。
ssh-keygen -t rsa ssh-copy-id -i ~/.ssh/id_rsa.pub huangqingshi@localhost
接下来在官网上下载hadoop的binary文件,然后开始解压,我下载的版本为hadoop-3.1.3版本。安装Hadoop的目的是用hadoop存储flink的JobManager高可用的元数据信息。
我安装的是hadoop的单机版,可以构建hadoop集群版。接下来进行hadoop的配置。
配置etc/hadoop/coresite.xml,指定namenode的hdfs协议文件系统的通信地址及临时文件目录。
<configuration> <property> <!--指定namenode的hdfs协议文件系统的通信地址--> <name>fs.defaultFS</name> <value>hdfs://127.0.0.1:9000</value> </property> <property> <!--指定hadoop集群存储临时文件的目录--> <name>hadoop.tmp.dir</name> <value>/tmp/hadoop/tmp</value> </property> </configuration>
配置etc/hadoop/hdfs-site.xml, 设置元数据的存放位置,数据块的存放位置,DFS监听端口。
<configuration> <property> <!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔--> <name>dfs.namenode.name.dir</name> <value>/tmp/hadoop/namenode/data</value> </property> <property> <!--datanode 节点数据(即数据块)的存放位置--> <name>dfs.datanode.data.dir</name> <value>/tmp/hadoop/datanode/data</value> </property> <property> <!--手动设置DFS监听端口--> <name>dfs.http.address</name> <value>127.0.0.1:50070</value> </property> </configuration>
配置etc/hadoop/yarn-site.xml,配置NodeManager上运行的附属服务以及resourceManager主机名。
<configuration> <!-- Site specific YARN configuration properties --> <property> <!--配置NodeManger上运行的附属服务。需要配置成mapreduce_shuffle后才可以在Yarn上运行MapReduce程序--> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <!--resourcemanager 的主机名--> <name>yarn.resourcemanager.hostname</name> <value>localhost</value> </property> </configuration>
配置etc/hadoop/mapred-site.xml,指定mapreduce作业运行在yarn上。
<property> <!--指定mapreduce作业运行在yarn上--> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
需要执行nameNode的format操作,不执行直接启动会报“NameNode is not formatted.”。
bin/hdfs namenode -format
接下来启动hadoop,如果成功的话,可以访问如下URL:
http://localhost:8088/ 查看构成cluster的节点
http://localhost:8042/node 查看node的相关信息。
以上说明hadoop单机版搭建完成。
接下来需要下载一个flink的hadoop插件,要不然flink启动的时候会报错的。
把下载的插件放到flink文件的lib文件夹中。
配置一下flink文件夹的conf/flink-conf.yaml。指定HA高可用模式为zookeeper,元数据存储路径用于恢复master,zookeeper用于flink的 checkpoint 以及 leader 选举。最后一条为zookeeper单机或集群的地址。
high-availability: zookeeper
high-availability.storageDir: hdfs://127.0.0.1:9000/flink/ha
high-availability.zookeeper.quorum: localhost:2181
其他的采用默认配置,比如JobManager的最大堆内存为1G,每一个TaskManager提供一个task slot,执行串行的任务。
接下来配置flink的 conf/masters 用于启动两个主节点JobManager。
localhost:8081
localhost:8082
配置flink的 conf/slaver 用于配置三个从节点TaskManager。
localhost
localhost
localhost
进入zookeeper路径并且启动zookeeper
bin/zkServer.sh start
进入flink路径并启动flink。
bin/start-cluster.sh conf/flink-conf.yaml
启动截图说明启动了两个节点的HA集群。
执行jps,两个JobManager节点和三个TaskManager节点:
浏览器访问 http://localhost:8081 和 http://localhost:8082,查看里边的日志,搜索granted leadership的说明是主JobManager,如下图。8082端口说明为主JobMaster
一个JobManager, 里边有三个TaskManager,两个JobManager共享这三个TaskManager:
接下来我们来验证一下集群的HA功能,我们已经知道8082为主JobManager,然后我们找到它的PID,使用如下命令:
ps -ef | grep StandaloneSession
我们将其kill掉,执行命令kill -9 51963,此时在访问localhost:8082 就不能访问了。localhost:8081 还可以访问,还可以提供服务。接下来咱们重新 启动flink的JobManager 8082 端口。
bin/jobmanager.sh start localhost 8082
此时8081已经成为leader了,继续提供高可用的HA了。
好了,到此就算搭建完成了。
以上是关于Flink JobManager高可用性(HA)的主要内容,如果未能解决你的问题,请参考以下文章
Flink Flink JobManager HA 机制的扩展与实现
2.Flink安装部署Local本地模式-了解Standalone独立集群模式Standalone-HA高可用集群模式(原理|操作|测试)