Flink JobManager高可用性(HA)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink JobManager高可用性(HA)相关的知识,希望对你有一定的参考价值。

参考技术A

JobManager 协调每个 Flink 部署。它负责调度和资源管理。

默认情况下,每个 Flink 集群只有一个 JobManager 实例。 这会产生单点故障(SPOF):如果 JobManager 崩溃,则无法提交新作业并且导致运行中的作业运行失败。

使用 JobManager 高可用性模式,可以避免这个问题,从而消除 SPOF。您可以为 Standalone YARN 集群 配置高可用性。

针对 Standalone 集群的 JobManager 高可用性的一般概念是,任何时候都有一个 主 JobManager 多个备 JobManagers ,以便在主节点失败时有备 JobManagers 来接管集群。这保证了 没有单点故障 ,一旦备 JobManager 接管集群,作业就可以正常运行。主备 JobManager 实例之间没有明显的区别。每个 JobManager 都可以充当主备节点。

例如,请考虑以下三个 JobManager 实例的设置:

要启用 JobManager 高可用性,您必须将 高可用性模式设置 为 zookeeper,配置 zookeeper quorum 将所有 JobManager 主机及其 web UI 端口写入 配置文件

Flink利用 ZooKeeper 在所有正在运行的 JobManager 实例之间进行分布式协调。 ZooKeeper 是独立于 Flink 的服务,通过 Leader 选举和轻量级一致状态存储提供高可靠的分布式协调。 更多关于 ZooKeeper 的信息, 请查看 ZooKeeper 的入门指南 。 Flink 包含用于 Bootstrap ZooKeeper 安装的脚本。

要启动HA集群,请在以下位置配置Master文件 conf/masters:

默认情况下,job manager选一个随机端口作为进程随机通信端口。您可以通过 high-availability.jobmanager.port 键修改此设置。此配置接受单个端口(例如50010),范围(50000-50025)或两者的组合(50010,50011,50020-50025,50050-50075)。

要启动HA集群,请将以下配置键添加到 conf/flink-conf.yaml:

每个 addressX:port 都是一个 ZooKeeper 服务器的ip及其端口,Flink 可以在指定的地址和端口访问zookeeper。

重要: 在运行 YARN 或其他群集管理器中运行时,不要手动设置此值。在这些情况下,将根据应用程序 ID 自动生成 cluster-id。 手动设置 cluster-id 会覆盖 YARN 中的自动生成的 ID。反过来,使用 -z CLI 选项指定 cluster-id 会覆盖手动配置。如果在裸机上运行多个 Flink HA 集群,则必须为每个集群手动配置单独的 cluster-id。

该storageDir 中保存了 JobManager 恢复状态所需的所有元数据。

配置 master 文件和 ZooKeeper quorum 之后,您可以使用提供的集群启动脚本。它们将启动 HA 群集。请注意,启动 Flink HA 集群前,必须启动 Zookeeper 集群 ,并确保为要启动的每个 HA 群集 配置单独的 ZooKeeper 根路径

在运行高可用性 YARN 集群时, 我们不会运行多个 JobManager (ApplicationMaster) 实例 ,而只运行一个,该JobManager实例失败时,YARN会将其重新启动。Yarn的具体行为取决于您使用的 YARN 版本。

在YARN 配置文件 yarn-site.xml 中,需要配置 application master 的最大重试次数:

当前 YARN 版本的默认值是2(表示允许单个JobManager失败两次)。

除了HA配置(参考上文)之外,您还必须配置最大重试次数 conf/flink-conf.yaml:

这意味着在如果程序启动失败,YARN会再重试9次(9 次重试 + 1次启动)。如果 YARN 操作需要,如果启动10次作业还失败,yarn才会将该任务的状态置为失败。如果抢占,节点硬件故障或重启,NodeManager 重新同步等操作需要,YARN继续尝试启动应用。 这些重启不计入 yarn.application-attempts 个数中,请参阅 Jian Fang 的博客文章 。重要的是要注意 yarn.resourcemanager.am.max-attempts 为yarn中程序重启上限。因此, Flink 中设置的程序尝试次数不能超过 YARN 的集群设置。

注意: Hadoop YARN 2.4.0 有一个主要的 bug (在2.5.0中修复),阻止重新启动的Application Master / Job Manager 重启容器。有关详细信息,请参阅 FLINK-4142 。我们建议,在yarn版本要等于或高于Hadoop 2.5.0 增加高可用配置。

如果 ZooKeeper 使用 Kerberos 以安全模式运行,flink-conf.yaml 根据需要覆盖以下配置:

有关 Kerberos 安全性的 Flink 配置的更多信息,请参阅 此处 。您还可以在 此处 找到关于 Flink 内部如何设置基于 kerberos 的安全性的详细信息。

如果您没有正在运行的ZooKeeper,则可以使用Flink程序附带的脚本。

这是一个 ZooKeeper 配置模板 conf/zoo.cfg。您可以为主机配置为使用 server.X 条目运行 ZooKeeper,其中 X 是每个服务器的唯一IP:

该脚本 bin/start-zookeeper-quorum.sh 将在每个配置的主机上启动 ZooKeeper 服务器。 Flink wrapper 会启动 ZooKeeper 服务,该 wraper 从 conf/zoo.cfg 中读取配置,并设置一些必需的配置项。在生产设置中,建议您使用自己安装的 ZooKeeper。

搭建高可用的flink JobManager HA

  JobManager协调每个flink应用的部署,它负责执行定时任务和资源管理。

  每一个Flink集群都有一个jobManager, 如果jobManager出现问题之后,将不能提交新的任务和运行新任务失败,这样会造成单点失败,所以需要构建高可用的JobMangager。

  类似zookeeper一样,构建好了高可用的jobManager之后,如果其中一个出现问题之后,其他可用的jobManager将会接管任务,变为leader。不会造成flink的任务执行失败。可以在单机版和集群版构建jobManager。

  下面开始构建一个单机版flink的JobManger高可用HA版。

  首先需要设置SSH免密登录,因为启动的时候程序会通过远程登录访问并且启动程序。

  执行命令,就可以免密登录自己的机器了。如果不进行免密登录的话,那么启动的hadoop的时候会报 "start port 22 connection refused"。

ssh-keygen -t rsa

ssh-copy-id -i ~/.ssh/id_rsa.pub huangqingshi@localhost

  接下来在官网上下载hadoop的binary文件,然后开始解压,我下载的版本为hadoop-3.1.3版本。安装Hadoop的目的是用hadoop存储flink的JobManager高可用的元数据信息。

  我安装的是hadoop的单机版,可以构建hadoop集群版。接下来进行hadoop的配置。

  配置etc/hadoop/coresite.xml,指定namenode的hdfs协议文件系统的通信地址及临时文件目录。

<configuration>
    <property>
        <!--指定namenode的hdfs协议文件系统的通信地址-->
        <name>fs.defaultFS</name>
        <value>hdfs://127.0.0.1:9000</value>
    </property>
    <property>
        <!--指定hadoop集群存储临时文件的目录-->
        <name>hadoop.tmp.dir</name>
        <value>/tmp/hadoop/tmp</value>
    </property>
</configuration>

  配置etc/hadoop/hdfs-site.xml, 设置元数据的存放位置,数据块的存放位置,DFS监听端口。

<configuration>
    <property>
        <!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔-->
        <name>dfs.namenode.name.dir</name>
        <value>/tmp/hadoop/namenode/data</value>
    </property>
    <property>
        <!--datanode 节点数据(即数据块)的存放位置-->
        <name>dfs.datanode.data.dir</name>
        <value>/tmp/hadoop/datanode/data</value>
    </property>
    <property>
        <!--手动设置DFS监听端口-->
        <name>dfs.http.address</name>
        <value>127.0.0.1:50070</value>
    </property>
</configuration>

  配置etc/hadoop/yarn-site.xml,配置NodeManager上运行的附属服务以及resourceManager主机名。

<configuration>

<!-- Site specific YARN configuration properties -->
    <property>
        <!--配置NodeManger上运行的附属服务。需要配置成mapreduce_shuffle后才可以在Yarn上运行MapReduce程序-->
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <!--resourcemanager 的主机名-->
        <name>yarn.resourcemanager.hostname</name>
        <value>localhost</value>
    </property>
</configuration>

  配置etc/hadoop/mapred-site.xml,指定mapreduce作业运行在yarn上。

    <property>
        <!--指定mapreduce作业运行在yarn上-->
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

  需要执行nameNode的format操作,不执行直接启动会报“NameNode is not formatted.”。

bin/hdfs namenode -format

  接下来启动hadoop,如果成功的话,可以访问如下URL:

  http://localhost:50070/

  技术图片

 

 

   http://localhost:8088/ 查看构成cluster的节点

技术图片

 

   http://localhost:8042/node 查看node的相关信息。

技术图片

 

 

  以上说明hadoop单机版搭建完成。  

  接下来需要下载一个flink的hadoop插件,要不然flink启动的时候会报错的。

  下载地址为:https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-7.0/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar

  把下载的插件放到flink文件的lib文件夹中。

  配置一下flink文件夹的conf/flink-conf.yaml。指定HA高可用模式为zookeeper,元数据存储路径用于恢复master,zookeeper用于flink的 checkpoint 以及 leader 选举。最后一条为zookeeper单机或集群的地址。

high-availability: zookeeper
high-availability.storageDir: hdfs://127.0.0.1:9000/flink/ha
high-availability.zookeeper.quorum: localhost:2181

  其他的采用默认配置,比如JobManager的最大堆内存为1G,每一个TaskManager提供一个task slot,执行串行的任务。

  接下来配置flink的 conf/masters 用于启动两个主节点JobManager。

localhost:8081
localhost:8082

  配置flink的 conf/slaver 用于配置三个从节点TaskManager。

localhost
localhost
localhost

  进入zookeeper路径并且启动zookeeper

bin/zkServer.sh start

  进入flink路径并启动flink。  

bin/start-cluster.sh conf/flink-conf.yaml

  启动截图说明启动了两个节点的HA集群。 

 技术图片

  执行jps,两个JobManager节点和三个TaskManager节点:

  技术图片

 

 

 

 

   浏览器访问 http://localhost:8081 和 http://localhost:8082,查看里边的日志,搜索granted leadership的说明是主JobManager,如下图。8082端口说明为主JobMaster

技术图片

 

   一个JobManager, 里边有三个TaskManager,两个JobManager共享这三个TaskManager:

技术图片

 

   接下来我们来验证一下集群的HA功能,我们已经知道8082为主JobManager,然后我们找到它的PID,使用如下命令:

ps -ef | grep StandaloneSession

  技术图片

 

   我们将其kill掉,执行命令kill -9 51963,此时在访问localhost:8082 就不能访问了。localhost:8081 还可以访问,还可以提供服务。接下来咱们重新 启动flink的JobManager 8082 端口。

bin/jobmanager.sh start localhost 8082

  此时8081已经成为leader了,继续提供高可用的HA了。

技术图片

 

   好了,到此就算搭建完成了。

      

  

以上是关于Flink JobManager高可用性(HA)的主要内容,如果未能解决你的问题,请参考以下文章

Flink HA

Flink的standAlone模式的HA环境

Flink Flink JobManager HA 机制的扩展与实现

2.Flink安装部署Local本地模式-了解Standalone独立集群模式Standalone-HA高可用集群模式(原理|操作|测试)

Flink的高可用原理

【Flink on k8s】高可用的关键机制及configmap数据详解