Flink on YARN时,如何确定TaskManager数

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink on YARN时,如何确定TaskManager数相关的知识,希望对你有一定的参考价值。

参考技术A 答案写在最前面:Job的最大并行度除以每个TaskManager分配的任务槽数。

在 Flink 1.5 Release Notes 中,有这样一段话,直接上截图。

这说明从1.5版本开始,Flink on YARN时的容器数量——亦即TaskManager数量——将由程序的并行度自动推算,也就是说flink run脚本的-yn/--yarncontainer参数不起作用了。那么自动推算的规则是什么呢?要弄清楚它,先来复习Flink的并行度(Parallelism)和任务槽(Task Slot)。

与Spark类似地,一个Flink Job在生成执行计划时也划分成多个Task。Task可以是Source、Sink、算子或算子链(算子链有点意思,之后会另写文章详细说的)。Task可以由多线程并发执行,每个线程处理Task输入数据的一个子集。而并发的数量就称为Parallelism,即并行度。

Flink程序中设定并行度有4种级别,从低到高分别为:算子级别、执行环境(ExecutionEnvironment)级别、客户端(命令行)级别、配置文件(flink-conf.yaml)级别。实际执行时,优先级则是反过来的,算子级别最高。简单示例如下。

Flink运行时由两个组件组成:JobManager与TaskManager,与Spark Standalone模式下的Master与Worker是同等概念。从官网抄来的图如下所示,很容易理解。

JobManager和TaskManager本质上都是JVM进程。为了提高Flink程序的运行效率和资源利用率,Flink在TaskManager中实现了任务槽(Task Slot)。任务槽是Flink计算资源的基本单位,每个任务槽可以在同一时间执行一个Task,而TaskManager可以拥有一个或者多个任务槽。

任务槽可以实现TaskManager中不同Task的资源隔离,不过是逻辑隔离,并且只隔离内存,亦即在调度层面认为每个任务槽“应该”得到taskmanager.heap.size的N分之一大小的内存。CPU资源不算在内。

TaskManager的任务槽个数在使用flink run脚本提交on YARN作业时用-ys/--yarnslots参数来指定,另外在flink-conf.yaml文件中也有默认值taskManager.numberOfTaskSlots。一般来讲,我们设定该参数时可以将它理解成一个TaskManager可以利用的CPU核心数,因此也要根据实际情况(集群的CPU资源和作业的计算量)来确定。

以Flink自带示例中简化的WordCount程序为例:

用--yarnslots 3参数来执行,即每个TaskManager分配3个任务槽。TaskManager、任务槽和任务的分布将如下图所示,方括号内的数字为并行线程的编号。

由图中可以看出,由于算子链机制的存在,KeyAgg与Sink操作链接在了一起,作为一个Task来执行。

Flink允许任务槽共享,即来自同一个Job的不同Task的Sub-Task(理解为Task的子集就行)进入同一个槽位,因此在图中也可以见到任务槽X中同时存在FlatMap[X]与KeyAgg[X]+Sink[X]。任务槽共享有两点好处:

所以,可以得出Flink on YARN时,TaskManager的数量就是:max(parallelism) / yarnslots(向上取整)。例如,一个最大并行度为20,每个TaskManager有两个任务槽的作业,就会启动10个TaskManager,如Web UI所示。

参考: https://www.jianshu.com/p/5b670d524fa5

Flink On Yarn如何查看任务日志

参考技术A 无论Flink还是Spark都支持自建集群(standalone cluster)。但是为了保证稳定性和资源隔离等,生产环境里的任务最好借助资源管理框架(如Yarn)运行。任务运行在yarn上,查询日志就可能不是很方便,尤其是任务进程异常退出之后。

yarn容器退出之后,默认是不保存日志的。所以需要开启JobHistoryServer,具体方法网上有很多教程。

运行中的Flink任务可以直接通过flink web ui查看:

对于已经结束的yarn应用,flink进程已经退出无法提供webui服务。所以需要通过JobHistoryServer查看保留在yarn上的日志。

好像没有task manager的日志啊,怎么办?我们可以先研究一下job manager的日志url: http://node5:19888/jobhistory/logs//node2:8041/container_1634207619484_0505_01_000001/container_1634207619484_0505_01_000001/root ,可以发现一些规律:

所以我们只需要知道taskmanager的 容器名 和 node 就能找到它的日志了。flink在jobmanager的日志中记录了详细的相关信息,包括所有的taskmanager的 容器名 和 node 。日志太多可能不好找,这里有一个小技巧:利用appid去搜索日志,比如本文中的例子,可以搜索 1634207619484_0505

分析这行日志:taskmanager只有一个,并且它的容器名为 container_1634207619484_0505_01_000002 ,node为 node1 (注意:后面拼接url用的端口号是8041不是37798)
最终我们得的taskmanager日志的url是: http://node5:19888/jobhistory/logs/node1:8041/container_1634207619484_0505_01_000002/container_1634207619484_0505_01_000002/root

运行中的flink/spark的日志查看非常容易,因为它们本身都提供了web ui服务。但是当任务异常退出之后,flink/spark进程的结束导致无法提供web ui服务。我们利用job history server来保留和展示当时的日志。但是yarn的web只展示了flink job manager/spark driver的日志链接,我们需要自己拼接flink task manager/spark executor日志链接。

最后我有一个小疑问:文中介绍的URL组成是推测出来的,其中第三部分 /container_1634207619484_0505_01_000001/container_1634207619484_0505_01_000001 是两个同样的容器名,这是为什么?希望知道的小伙伴能留言解惑一下。

相关链接:
Flink On Yarn如何查看任务日志
Spark On Yarn如何查看任务日志

以上是关于Flink on YARN时,如何确定TaskManager数的主要内容,如果未能解决你的问题,请参考以下文章

Flink On Yarn如何查看任务日志

flinkFlink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

Flink on Zeppelin 系列之:Yarn Application 模式支持

Flink1.6系列之—Flink on yarn流程详解

flink on yarn之per-job方式部署超时的一种解决方法

[问题踩坑]Flink 1.11.1 on Yarn Application模式时,JobId始终为00000000000000000000000000000000