Flink在YARN上支持的三种部署方式及使用方式

Posted penriver

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink在YARN上支持的三种部署方式及使用方式相关的知识,希望对你有一定的参考价值。

本文基于flink 1.13.x yarn 3.1.x编写并验证

1 前提

  • 安装yarn集群并启动

  • 配置HADOOP_CLASSPATH环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

注意:默认情况下,所有必需的Hadoop配置文件都是通过HADOOP_CLASSPATH环境变量从类路径加载的。

2 Flink在YARN上支持的部署方式

对于生产使用,建议Per-job or Application Mode部署Flink应用程序,因为这些模式为应用程序提供了更好的隔离
详见参见: flink on yarn

2.1 Application Mode

Application Mode将在YARN上启动一个Flink集群,其中应用程序jar的main()方法将在YARN中的JobManager上执行。

应用程序一完成,集群就会关闭。

可以通过yarn application -kill 或取消Flink作业手动停止集群。

#提交任务
./bin/flink run-application -t yarn-application ./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar -d

#列出集群上正在运行的作业,列出jobId、jobName
./bin/flink list -t yarn-application -Dyarn.application.id=application_1626944535001_1008

#取消任务: jobId
#请注意,取消应用程序集群上的作业将停止该集群。
./bin/flink cancel -t yarn-application -Dyarn.application.id=application_1626944535001_1008 825e0311eef66f586d0977a7c20a432d

为了释放Application Mode的全部潜力,请考虑与yarn.provided.lib.dirs配置项一起使用

将flink的依赖jar、应用程序jar上传到集群中所有节点都可以访问的位置。

下述操作将使作业提交变得更加轻量级,因为所需的Flink jar和应用程序jar将由指定的远程位置提取,而不是由客户机发送到集群。

#将flink lib及应用的jar上传到hdfs
hdfs dfs -mkdir -p hdfs://pci01:8020/jars/flink
hdfs dfs -copyFromLocal lib/*.jar hdfs://pci01:8020/jars/flink/
 
hdfs dfs -mkdir -p hdfs://pci01:8020/jars/apps
hdfs dfs -copyFromLocal ./examples/streaming/TopSpeedWindowing.jar hdfs://pci01:8020/jars/apps/
hdfs dfs -copyFromLocal ./examples/batch/WordCount.jar hdfs://pci01:8020/jars/apps/
 
#运行TopSpeedWindowing
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/" hdfs://pci01:8020/jars/apps/TopSpeedWindowing.jar

#运行wordcount示例
./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://pci01:8020/jars/flink/" hdfs://pci01:8020/jars/apps/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt --output hdfs://pci01:8020/tmp/wordcount-result10.txt
hdfs dfs -cat hdfs://pci01:8020/tmp/wordcount-result10.tx

2.2 Per-Job Cluster Mode

Per-Job Cluster Mode将在YARN上启动一个Flink集群,然后在本地运行提供的应用程序jar,最后将JobGraph提交给YARN上的JobManager。如果传递–detached参数,客户端将在提交被接受后停止。

一个任务会对应一个Job,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager按需接受资源申请;适合规模大长时间运行的作业

一旦作业停止,Flink集群就会停止。

#提交任务
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

#列出集群上正在运行的作业, 列出jobId、jobName
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_1626944535001_1015

#取消任务: jobId
#请注意,取消应用程序集群上的作业将停止该集群。
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1626944535001_1015 3386aec1fb6fba288c13a17b5cb6730b

2.3 Session Mode

Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。

Session-Cluster的资源在启动集群时就定义完成,后续所有作业的提交都共享该资源,作业可能会互相影响,因此比较适合小规模短时间运行的作业

Session Mode将在/tmp/.yarn-properties-目录下创建一个隐藏的YARN properties文件,它将在提交作业时用于集群发现,并在命令行界面中显示,有如下两种操作模式:

  • attached mode (default): yarn-session.sh 客户端将Flink集群提交给YARN,但是客户端继续运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会发出关闭集群的信号。

  • detached mode (-d or --detached): 将Flink集群提交给YARN,客户端返回。需要通过以下命令停止

    #优雅停止
    echo "stop" | ./bin/yarn-session.sh -id application_1626944535001_0984
    #停止
    yarn application -kill application_1626944535001_0984
    

除了通过conf/flink-conf.yaml文件传递配置,也可以使用-Dkey=value参数在提交时将任何配置传递./bin/yarn-session.sh客户端

2.3.1 在YARN上启动一个长期的Flink集群

-s参数表示每个Task Manager上可用的处理槽(processing slot)数量。建议把槽数量设置成每个机器处理器的个数。 一旦会话被启动,就可以使用./bin/flink工具提交任务到集群上。

cd flink/flink-1.13.2
./bin/yarn-session.sh -jm 2048 -tm 4096 -s 16 -d

#优雅停止
echo "stop" | ./bin/yarn-session.sh -id application_1626944535001_0984
#停止
yarn application -kill application_1626944535001_0984

# 查看可以获得的命令
echo "help" | ./bin/yarn-session.sh -id application_1626944535001_0984
 
# 查看运行的application
yarn top

2.3.1.1 查看启动的flink集群

在 Yarn Cluster页面 http://172.25..:8088/cluster/apps/RUNNING,查看 name为Flink session cluster的application, 点击Tracking UI中的ApplicationMaster链接,跳转到Flink dashboard页面


在yarn-session.sh成功运行后,日志显示如下:
在日志的末尾显示了 flink dashboard地址 及 applicationId

2.3.1.2 查看配置信息

more /tmp/.yarn-properties-root

显示内容如下:

#Generated YARN properties file
#Fri Sep 10 11:43:19 CST 2021
dynamicPropertiesString=
applicationID=application_1626944535001_1021

2.3.2 yarn-session.sh脚本参数

参数名称解释
-at,–applicationType Set a custom application type for the application on YARN
-D <property=value>use value for given property
-d,–detachedIf present, runs the job in detached mode
-h,–helpHelp for the Yarn session CLI.
-id,–applicationId Attach to running YARN session
-j,–jar Path to Flink jar file
-jm,–jobManagerMemory Memory for JobManager Container with optional unit (default: MB)
-m,–jobmanager Set to yarn-cluster to use YARN execution mode.
-nl,–nodeLabel Specify YARN node label for the YARN application
-nm,–name Set a custom name for the application on YARN
-q,–queryDisplay available YARN resources (memory, cores)
-qu,–queue Specify YARN queue.
-s,–slots Number of slots per TaskManager
-t,–ship Ship files in the specified directory (t for transfer)
-tm,–taskManagerMemory Memory per TaskManager Container with optional unit (default: MB)
-yd,–yarndetachedIf present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,–zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode

2.3.3 提交任务

./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

./bin/flink run ./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt --output hdfs://pci01:8020/tmp/wordcount-result20.txt

./bin/flink run -t yarn-session  \\
-Dyarn.application.id=application_1626944535001_1021  \\
./examples/batch/WordCount.jar --input hdfs://pci01:8020/tmp/README.txt  \\
--output hdfs://pci01:8020/tmp/wordcount-result21.txt

3 flink on yarn配置

flink on yarn配置

以下配置参数是由Flink on YARN管理的,因为它们可能会在运行时被框架覆盖:

  • jobmanager.rpc.address: 通过Flink on YARN上动态设置为JobManager容器的地址
  • io.tmp.dirs:如果没有设置,Flink将设置YARN定义的临时目录
  • high-availability.cluster-id : 自动生成的ID,用于区分HA服务中的多个集群

如果需要将额外的Hadoop配置文件传递给Flink,可以通过HADOOP_CONF_DIR环境变量来实现,该变量接受一个包含Hadoop配置文件的目录名。默认情况下,所有必需的Hadoop配置文件都是通过HADOOP_CLASSPATH环境变量从类路径加载的。

4 Resource Allocation Behavior 资源配置行为

如果一个运行在YARN上的JobManager不能使用现有资源运行所有提交的作业,那么它将请求额外的taskManager。特别是在Session Mode下运行时,如果需要,JobManager会在提交额外的作业时分配额外的taskManager。未使用的taskManager在超时后再次释放

YARN实现将尊重JobManager和TaskManager进程的内存配置。默认情况下,报告的VCores数量等于每个TaskManager配置的槽位数量。yarn.containers.vcores允许使用自定义值覆盖vcores的数量。为了让这个参数生效,您应该在YARN集群中启用CPU scheduling

失败的容器(包括JobManager)将被YARN替换。JobManager容器重启的最大次数是通过 yarn.application-attempts[默认是1]配置的。当所有的尝试都耗尽时,YARN应用程序将失败。

5 User jars & Classpath

默认情况下,Flink在运行单个作业时将用户jar包含到系统类路径【system classpath】中。这种行为可以通过yarn.per-job-cluster.include-user-jar 参数进行控制。

当将此设置为DISABLED时,表示用户jar被排除在系统类路径之外。

用户jar在类路径中的位置可以通过将参数设置为以下参数之一来控制:

  • ORDER: (default) Adds the jar to the system classpath based on the lexicographic order.
  • FIRST: Adds the jar to the beginning of the system classpath.
  • LAST: Adds the jar to the end of the system classpath.

6 三种模式的区别

详见 deployment
主要区别点如下:

  • 集群生命周期和资源隔离保证
  • 应用程序的main()方法是在client上执行还是在cluster上执行。
  • 在session mode中,如果其中一个作业行为不当或关闭了一个TaskManager,那么在该TaskManager上运行的所有作业都将受到该失败的影响

应用程序的main方法包括的过程如下:

  • 在本地下载应用程序的依赖项,
  • 执行main()来提取Flink的运行时可以理解的应用程序的表示(即JobGraph),
  • 并将依赖项和JobGraph发送到集群

如果main在client执行,则client会成为一个沉重的资源消耗者,因为它可能需要大量的网络带宽来下载依赖项并将二进制文件发送到集群,并需要CPU周期来执行main()。当client 服务器在用户之间共享时,这个问题会更加明显。

Application Mode 与 Per-Job Mode的区别

  • Application Mode的main代码在cluster中执行,Per-Job Mode在client中执行
  • 与Per-Job模式相比,Application模式允许提交由多个作业Job组成的应用程序。

以上是关于Flink在YARN上支持的三种部署方式及使用方式的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark支持三种分布式部署方式 standalonespark on mesos和 spark on YARN区别

flink on yarn之per-job方式部署超时的一种解决方法

Flink1.13.2三种方式安装部署(建议收藏!)

kerberos环境下parcel方式部署flink1.15.3 基于CDH6.3.2 Flink on Yarn

Flink on YARN的第三种部署模式:Application Mode

Hadoop-yarn组件的三种调度器