FlinkFlink运行时的架构
Posted 飝鱻.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink运行时的架构相关的知识,希望对你有一定的参考价值。
Flink运行时的架构
Flink 运行时的组件
- 运行时主要有四大组件
作业管理器(JobManager)
-
控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。
-
JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
-
JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
-
JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查(checkpoints)的协调。
任务管理器(TaskManager)
-
Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
-
启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
-
在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据
资源管理器(ResourceManager)
-
主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。
-
Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
-
当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
分发器(Dispatcher)
-
可以跨作业运行,它为应用提交提供了REST接口。
-
当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
-
Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
-
Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
任务提交流程
- 使用WEBUI来来提交的原理
- yarn任务调度的原理
任务调度原理
并行度
- 一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度。
TaskManager 和 Slots
-
Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务
-
为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)
-
默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务。 这样的结果是,一个 slot 可以保存作业的整个管道。
-
Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力
并行子任务的分配
-
上述描述的是一个具有两个数据入流的数据A、C(后面的数字是每个任务的并行度)
-
上述在没有划分slot组的时候只需要4个slot即可,因为slot是可以共享的,划分slot组只需要添加下列代码即可
-
划分了slot组后只有同组的slot的可以共享的
-
也就是说默认的slot数量就是看算子里面的最大的并行度
SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputStream.flatMap(new WordCountTest01.MyFlatMapper()) .keyBy(0)//与groupBy相似,按照key的hashCode .sum(1).setParallelism(3).slotSharingGroup("red");
程序和数据流
-
所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。
-
Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出
-
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分
-
每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)
-
在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系
执行图(ExecutionGraph)
-
Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
-
StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
-
JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点
-
ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
-
物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。
数据传输形式
-
一个程序中,不同的算子可能具有不同的并行度
-
算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类
One-to-one:stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。
Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。
任务链(Operator Chains)
-
Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接
-
相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask
-
并行度相同、并且是 one-to-one 操作,两个条件缺一不可
flinkFlink 批作业的运行时自适应执行管控
1.概述
摘要:本文整理自阿里云高级技术专家朱翥(长耕),在 FFA 核心技术专场的分享。本篇内容是关于在过去的一年中,Apache Flink 对运行时的作业执行管控进行的一些改进。
这些改进,让 Flink 可以更好的利用运行时的信息,来灵活的控制作业的执行,从而使得 Flink 批处理作业的执行可以更加的稳定、更有效率,并且更容易运维。详细内容主要分为两个部分:
自适应执行计划
同源实例的并行执行
Tips:点击「阅读原文」查看原文视频&演讲 ppt
我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。
Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进行链接合并,最终形成 JobGraph,从而降低计算节点间的数据传输开销。这个操作的目的是,是为了降低计算节点之间的数据传输开销。StreamGraph 和 JobGraph 都是在编译阶段生成的。JobGraph 会提交给 Flink Job Manager,从而启动和执行作业。
在执行作业前,Flink 会生成 ExecutionGraph。这个 ExecutionGraph 是根据 JobGraph 中的节点并行度,展开生成的。我们知道,Flink 是一个分布式计算框架。而 ExecutionGraph 的每一个节点,都对应着一个需要部署到 TaskManager 上进行执行的任务,每一条边都对应着任务的输入和输出。所以说,它是作业的物理执行计划。
这个物理执行计划,描述了任务的计算逻辑、所需资源和并行度,同时也描述任务产出数据的划分方式,此外还描述了任务对数据的依赖关系以及数据传输方式。
通过它,Flink 就能知道如何创建和调度作业的所有任务,从而完成作业的执行。
但是,如前面所说,它是在作业运行前就已经确定的,是静态的。而 Flink 难以在作业执行前,预判什么样的计划参数更合理。所以,这些执行计划参数,只能依赖用户提前指定,也就是需要手动调优。
然而,对于批作业,由于其分阶段执行的特性,在执行一个阶段前,理论上 Flink 是可以获得很多有用的信息的,比如其消费的数据量大小、这些数据的分布模式、当前的可用资源等等。
基于这些信息,我们可以让 Flink 对执行计划动态的进行调优,从而获得更好的执行效率。并且,由于 Flink 可以自动的进行这些调优,也可以让用户从手动调优中解放出来。
这就是 Flink 批处理作业的自适应执行计划。
为了支持自适应执行计划,最核心的一点,是需要一个可以动态调整的执行拓扑。所以,我们改造了 ExecutionGraph,使其支持渐进式构建。
具体的来说,就是让 ExecutionGraph 一开始只包含 Source 节点,随着执行的推进,再逐渐的加入后续的节点和连边。
这样,Flink 就有机会对尚未加入的执行节点和连边进行调整。
但在这个地方,我们遭遇了一个阻碍
。因为在原来的作业执行中,上游节点执行是依赖于下游节点的并行度的。具体来说,是因为上游在产出数据时,会根据下游并行度,对数据进行划分(sub-partition)
;这样,每个下游任务就可以直接消费其对应的那一个数据分区
。然而,在动态执行计划的场景下,下游节点的并行度是不确定的
。
为了解决这个问题,我们改造了节点数据的划分逻辑,使其不再根据下游节点的并行度,而是根据其最大并行度进行划分
。同时,我们也改造了节点消费数据的逻辑,使其不再只消费单一分区
,而是可以消费一组连续的数据分区(sub-partition range)。
通过这样的方式,上游节点执行得以和下游节点的并行度解耦
,动态执行拓扑也得以实现。
在支持了动态执行拓扑后,我们引入了 Adaptive Batch Scheduler 来支持自适应执行计划
。
与原有调度器不同的地方在于,Adaptive Batch Scheduler 会基于动态执行拓扑进行作业管控,持续收集运行时的信息,定制后续的执行计划
。Flink 会基于执行计划,动态生成执行节点和连边,以此来更新执行拓扑。
在上述框架下,我们为 Flink 增加了自动决定并行度的能力
。用户只需要配置希望单个执行节点处理的数据量, Flink 就可以根据该阶段需要处理的数据量,自动推导该阶段的节点并行度
。
相比起传统的为每个作业单独配置并行度,自动决定并行度有这些优点:
- 一是配置简单,无需为每个作业单独配置,一项配置可以适用于很多作业;
- 二是可以自动的适配每天变化的数据量,当数据量较大时,作业并行度可以大一些,从而保障作业的产出时间;
- 三是可以细粒度的调整作业的并行度,提高资源利用率。
但是自动决定并行度,数据可能分布不均
。为了解决这个问题,我们在自动决定并行度的基础上,进行了自动均衡下发数据的改进。
这个改进会采集 sub-partition 粒度的数据量
,并以此来决定执行节点的并行度,以及每个执行节点应该消费哪些分区数据。从而尽可能让下游各执行节点消费的数据,接近用户配置的预期值。
相比起自动决定并行度,这样的方式不但让下游数据量更均衡,而且能够缓解数据倾斜的影响。这个功能正在开发中,会随着 Flink 1.17 发布
。
以上就是我们当前已经或是即将在 Flink 中完成的自适应执行计划的改进。
不过,自适应执行计划还有更大的改进空间,比如根据 join 算子实际消费的数据量,动态决定应该用 hash join 还是 broadcast join;支持选择性执行任务,在满足特定条件下,为作业加入额外的执行分支;在 Sink 输出结果达标时提前结束作业。
此外,我们也在考虑 SQL 的动态优化能力。
当前,SQL 的查询优化是在作业编译时进行的;其只能通过 Source 的 Meta 信息,对数据量进行估算,容易导致优化结果不准确。如果可以向 SQL planner 反馈运行时信息,来动态的优化执行计划,就可以得到更好的执行效果。
2.同源实例的并行执行
接下来,讲一讲同源实例的并行执行。
同源实例是指,属于同一个执行节点的执行实例
。执行拓扑是由执行节点组成,各节点会创建执行实例,将其部署到 TaskManager 上进行执行。
当前,每个执行节点在某一时刻只能有一个执行实例,只有当该实例失败(或被取消)后,节点才会创建一个新的执行实例。这也意味着,同源执行实例只能串行执行
。
驱动我们更改这一现状的,是来自预测执行的需求。
在生产中,热点机器是无法避免的,混部集群、密集回刷,都可能导致一台机器的负载高、IO 繁忙。其上执行的数据处理任务可能异常缓慢,导致批作业产出时间难以得到保障。
预测执行
,是一种已经得到普遍的认可、用来解决这类问题的方法。
其基本思路是,为热点机器上的慢任务创建新的执行实例,并部署在正常的机器节点上
。这些预测执行实例和对应的原始实例,具有相同的输入和产出。其中,最先完成的实例会被承认,其他相应实例会被取消
。
因此,为了支持预测执行,Flink 必须支持多个同源实例并行执行
。为了支持同源实例并行执行,我们进行了下列改进。
首先,我们重新梳理了执行节点的状态。
当前,执行节点的状态和其当前唯一执行实例是一一对应的。然而,如果一个节点可以同时存在多个执行实例,这样的映射方式就会出现问题。
为此,我们重新定义了执行节点与执行实例的状态映射,取执行实例中最接近 FINISH 状态的状态作为执行节点的状态。这样既可以兼容单执行实例场景,也能支持多个同源实例并行执行的场景。
其次,我们保证了 Source 的同源执行实例,总是会读取到相同的数据。
大体上来说,就是我们在框架层为每个 Source 执行节点增加一个列表,来维护分配给它的数据分片。该节点的所有执行实例都会共享这一个列表,只是会各自维护一个不同的下标,来记录其处理到的数据分片进度
。
这样的改动的好处是,大部分现有 Source 不需要额外的修改,就可以进行预测执行。只有当 Source 使用了自定义事件的情况下,它们才需要实现一个额外的接口,用以保证各个事件可以被分发给正确的执行实例。
在接下来的 Flink 1.17 中,我们也会支持 Sink 的同源执行实例并行执行。
其关键点在于避免不同 Sink 之间的执行冲突,特别是要避免因此产生的数据不一致,因为 Sink 是需要向外部系统进行写入的。
由于 Sink 的写入逻辑隐藏在各个 Sink 的实现中,我们无法像 Source 一样在框架层统一避免写入冲突。所以我们向 Sink 层暴露了执行实例标识(attemptNumber),使 Sink 可以自行避免冲突
。
同时为了安全起见,我们默认不会允许 Sink 的同源执行实例并行执行,除非 Sink 显式声明支持同源执行实例并行执行
。
在此基础上,我们为 Flink引入了预测执行机制。主要包括三个核心组件。
首先是慢任务检测器
。它会定期进行检测
,综合任务处理的数据量,以及其执行时长,评判任务是否是慢任务。当发现慢任务时,它会通知给批处理调度器
。
其次是批处理调度器
。在收到慢任务通知时,它会通知黑名单处理器
,对慢任务所在的机器进行加黑
。并且,只要慢任务同源执行的实例数量,没有超过用户配置上限,它会为其拉起并部署新的执行实例
。当任意执行实例成功完成时,调度器会取消掉其他的同源执行实例。
最后是黑名单处理器
。Flink 可以利用其加黑机器。当机器节点被加黑后,后续的任务不会被部署到该机器。为了支持预测执行,我们支持了软加黑
的方式,即加黑机器上已经存在的任务,可以继续执行而不会因为加黑被取消掉
。
除此之外,工作人员对外部 UI 进行改进,方便展示当前运行中的所有同源执行实例,用户可以更好的判断预测执行的执行结果。
此外,我们对 WebUI 也进行了改进,使其能够展示当前运行中,或是作业结束时的所有同源执行实例,用户可以更好的判断预测执行的执行结果。此外,UI 也能展示被加黑的 Slot 和 TaskManager。
需要说明的是,虽然出发点是支持批作业的预测执行。同源执行实例的并行执行,也为流作业的任务平滑迁移提供了可能。
当流作业有任务落在慢机器上时,我们也可能先为其预先拉起一个同源执行实例,待该实例的部署和初始化完成后,通过直接切换数据连边,可以达成低断流的任务迁移。配合慢任务检测、黑名单等能力,我们甚至能让 Flink 自动的进行慢任务迁移。
以上是关于FlinkFlink运行时的架构的主要内容,如果未能解决你的问题,请参考以下文章