Flink-Yarn安装及使用

Posted   

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink-Yarn安装及使用相关的知识,希望对你有一定的参考价值。

Flink-Yarn安装及使用

1.背景介绍

独立(Standalone)模式由Flink自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但我们知道,Flink是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其它资源调度框架集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是YARN了。所以接下来我们就将学习,在强大的YARN平台上Flink是如何集成部署的。
整体来说,YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

2.安装

2.1前提

Hadoop集群启动,(包括Hdfs和Yarn)

2.2下载安装包及解压

进入Flink官网,下载1.12.0版本安装包flink-1.12.0-bin-scala_2.11.tgz,注意此处选用对应scala版本为scala 2.11的安装包。
解压到指定路径,修改文件名

mv flink-1.12.0/ flink-yarn

2.3 配置环境变量

在/etc/profile添加

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

2.4 修改配置

在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:

jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
parallelism.default:Flink任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
默认配置:
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1

3.使用

3.1 会话模式部署

ARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN session)来启动Flink集群。具体步骤如下:

1. 启动集群

(1)启动hadoop集群(HDFS, YARN)
(2)执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。

bin/yarn-session.sh -nm test

可用参数解读:

-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。需要放在其他参数前边,否则不生效。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。

注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。

YARN session启动之后会给出一个web UI地址以及一个YARN application ID,用户可以通过web UI或者命令行两种方式提交作业。

2. 提交作业

(1)通过Web UI提交作业
这种方式比较简单,与上文所述standalone部署模式基本相同。
(2)通过命令行提交作业
① 将Standalone模式讲解中打包好的任务运行JAR包上传至集群
② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。

 bin/flink run -c WordCount Flink-1.0-SNAPSHOT.jar

客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN session的启动页面中可以找到。
③ 任务提交成功后,可在YARN的Web UI界面查看运行情况
可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。
④也可以通过Flink的Web UI页面查看提交任务的运行情况

3.2单作业模式部署

在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
(1)执行命令提交作业

bin/flink run -d -t yarn-per-job -c WordCount Flink-1.0-SNAPSHOT.jar

早期版本也有另一种写法:

bin/flink run -m yarn-cluster -c WordCount Flink-1.0-SNAPSHOT.jar

注意这里是通过参数-m yarn-cluster指定向YARN集群提交任务。
参数:

 -yn,--container <arg> 表示分配容器的数量,也就是 TaskManager 的数量。
 -d,--detached:设置在后台运行。
 -yjm,--jobManagerMemory<arg>:设置 JobManager 的内存,单位是 MB。
 -ytm,--taskManagerMemory<arg>:设置每个 TaskManager 的内存,单位是 MB。
 -ynm,--name:给当前 Flink application 在 Yarn 上指定名称。
 -yq,--query:显示 yarn 中可用的资源(内存、cpu 核数)
 -yqu,--queue<arg> :指定 yarn 资源队列
 -ys,--slots<arg> :每个 TaskManager 使用的 Slot 数量。
 -yz,--zookeeperNamespace<arg>:针对 HA 模式在 Zookeeper 上创建 NameSpace
 -yid,--applicationID<yarnAppId> : 指定 Yarn 集群上的任务 ID,附着到一个后台独立运行的 Yarn Session 中。

(2)在YARN的ResourceManager界面查看执行情况

点击可以打开Flink Web UI页面进行监控:

(3)在命令行中查看或取消作业

./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>

这里的application_XXXX_YY是当前应用的ID,是作业的ID。注意如果取消作业,整个Flink集群也会停掉。

3.3 应用模式部署

应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。
(1)执行命令提交作业

bin/flink run-application -t yarn-application -c WordCount Flink-1.0-SNAPSHOT.jar 

(2)在命令行中查看或取消作业

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

(3)也可以通过yarn.provided.lib.dirs配置选项指定位置,将jar上传到远程:

./bin/flink run-application -t yarn-application	-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"	hdfs://myhdfs/jars/my-application.jar

这种方式下jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。

3.4 高可用

YARN模式的高可用和Standalone模式的高可用原理不一样。
Standalone模式中, 同时启动多个Jobmanager, 一个为“领导者”(leader),其他为“后备”(standby), 当leader挂了, 其他的才会有一个成为leader。
而YARN的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, YARN会再次启动一个, 所以其实是利用的YARN的重试次数来实现的高可用。
(1)在yarn-site.xml中配置

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
  </description>
</property>

注意: 配置完不要忘记分发, 和重启YARN
(2)在flink-conf.yaml中配置

yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://ns1/flink/yarn/ha
high-availability.zookeeper.quorum: hmcs030:2181,hmcs031:2181,hmcs032:2181
high-availability.zookeeper.path.root: /flink-yarn

(3)启动yarn-session
(4)杀死Jobmanager, 查看的他的复活情况
注意: yarn-site.xml中是它活的次数的上限, flink-conf.xml中的次数应该小于这个值。

以上是关于Flink-Yarn安装及使用的主要内容,如果未能解决你的问题,请参考以下文章

FLINK安装及提交任务

FLINK安装及提交任务

MacOS下安装Apache Flink及测试WordCount

Flink安装及实例教程

Flink学习之流处理原理

Flink学习之流处理原理