FlinkFlink运行时的架构

Posted 飝鱻.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink运行时的架构相关的知识,希望对你有一定的参考价值。

Flink运行时的架构

Flink 运行时的组件

  • 运行时主要有四大组件

作业管理器(JobManager)

  • 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。

  • JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。

  • JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。

  • JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查(checkpoints)的协调。

任务管理器(TaskManager)

  • Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。

  • 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。

  • 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据

资源管理器(ResourceManager)

  • 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。

  • Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。

  • 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。

分发器(Dispatcher)

  • 可以跨作业运行,它为应用提交提供了REST接口。

  • 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。

  • Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。

  • Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

跳转顶部


任务提交流程

  • 使用WEBUI来来提交的原理
  • yarn任务调度的原理

跳转顶部


任务调度原理

并行度

  • 一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个 stream 的并行度,可以认为就是其所有算子中最大的并行度

TaskManager 和 Slots

  • Flink 中每一个 TaskManager 都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务

  • 为了控制一个 TaskManager 能接收多少个 task, TaskManager 通过 task slot 来进行控制(一个 TaskManager 至少有一个 slot)

  • 默认情况下,Flink 允许子任务共享 slot,即使它们是不同任务的子任务。 这样的结果是,一个 slot 可以保存作业的整个管道。

  • Task Slot 是静态的概念,是指 TaskManager 具有的并发执行能力

并行子任务的分配

  • 上述描述的是一个具有两个数据入流的数据A、C(后面的数字是每个任务的并行度)

  • 上述在没有划分slot组的时候只需要4个slot即可,因为slot是可以共享的,划分slot组只需要添加下列代码即可

  • 划分了slot组后只有同组的slot的可以共享的

  • 也就是说默认的slot数量就是看算子里面的最大的并行度

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputStream.flatMap(new WordCountTest01.MyFlatMapper())
                .keyBy(0)//与groupBy相似,按照key的hashCode
                .sum(1).setParallelism(3).slotSharingGroup("red");
    

程序和数据流

  • 所有的Flink程序都是由三部分组成的: Source 、Transformation 和 Sink。

  • Source 负责读取数据源,Transformation 利用各种算子进行处理加工,Sink 负责输出

  • 在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包含了这三部分

  • 每一个dataflow以一个或多个sources开始以一个或多个sinks结束。dataflow类似于任意的有向无环图(DAG)

  • 在大部分情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系

执行图(ExecutionGraph)

  • Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。

  • JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点

  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

数据传输形式

  • 一个程序中,不同的算子可能具有不同的并行度

  • 算子之间传输数据的形式可以是 one-to-one (forwarding) 的模式也可以是redistributing 的模式,具体是哪一种形式,取决于算子的种类

    One-to-one:stream维护着分区以及元素的顺序(比如source和map之间)。这意味着map 算子的子任务看到的元素的个数以及顺序跟 source 算子的子任务生产的元素的个数、顺序相同。map、fliter、flatMap等算子都是one-to-one的对应关系。

    Redistributing:stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy 基于 hashCode 重分区、而 broadcast 和 rebalance 会随机重新分区,这些算子都会引起redistribute过程,而 redistribute 过程就类似于 Spark 中的 shuffle 过程。

任务链(Operator Chains)

  • Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接

  • 相同并行度的 one-to-one 操作,Flink 这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask

  • 并行度相同、并且是 one-to-one 操作,两个条件缺一不可

跳转顶部


flinkFlink 批作业的运行时自适应执行管控

1.概述

转载:Flink 批作业的运行时自适应执行管控

摘要:本文整理自阿里云高级技术专家朱翥(长耕),在 FFA 核心技术专场的分享。本篇内容是关于在过去的一年中,Apache Flink 对运行时的作业执行管控进行的一些改进。

这些改进,让 Flink 可以更好的利用运行时的信息,来灵活的控制作业的执行,从而使得 Flink 批处理作业的执行可以更加的稳定、更有效率,并且更容易运维。详细内容主要分为两个部分:

自适应执行计划

同源实例的并行执行

Tips:点击「阅读原文」查看原文视频&演讲 ppt


我们先看一下,Flink 是如何描述作业的执行计划的。以这个 DataStream 作业为例,Flink 会基于它先生成一个 StreamGraph。这是一个有向无环图,图中的节点对应着计算逻辑,图中的边则对应着数据的分发方式。

Flink 会根据节点的并行度以及他们之间的连边方式,把一些计算节点进行链接合并,最终形成 JobGraph,从而降低计算节点间的数据传输开销。这个操作的目的是,是为了降低计算节点之间的数据传输开销。StreamGraph 和 JobGraph 都是在编译阶段生成的。JobGraph 会提交给 Flink Job Manager,从而启动和执行作业。

在执行作业前,Flink 会生成 ExecutionGraph。这个 ExecutionGraph 是根据 JobGraph 中的节点并行度,展开生成的。我们知道,Flink 是一个分布式计算框架。而 ExecutionGraph 的每一个节点,都对应着一个需要部署到 TaskManager 上进行执行的任务,每一条边都对应着任务的输入和输出。所以说,它是作业的物理执行计划。


这个物理执行计划,描述了任务的计算逻辑、所需资源和并行度,同时也描述任务产出数据的划分方式,此外还描述了任务对数据的依赖关系以及数据传输方式。

通过它,Flink 就能知道如何创建和调度作业的所有任务,从而完成作业的执行。

但是,如前面所说,它是在作业运行前就已经确定的,是静态的。而 Flink 难以在作业执行前,预判什么样的计划参数更合理。所以,这些执行计划参数,只能依赖用户提前指定,也就是需要手动调优。


然而,对于批作业,由于其分阶段执行的特性,在执行一个阶段前,理论上 Flink 是可以获得很多有用的信息的,比如其消费的数据量大小、这些数据的分布模式、当前的可用资源等等。

基于这些信息,我们可以让 Flink 对执行计划动态的进行调优,从而获得更好的执行效率。并且,由于 Flink 可以自动的进行这些调优,也可以让用户从手动调优中解放出来。

这就是 Flink 批处理作业的自适应执行计划。


为了支持自适应执行计划,最核心的一点,是需要一个可以动态调整的执行拓扑。所以,我们改造了 ExecutionGraph,使其支持渐进式构建。


具体的来说,就是让 ExecutionGraph 一开始只包含 Source 节点,随着执行的推进,再逐渐的加入后续的节点和连边。

这样,Flink 就有机会对尚未加入的执行节点和连边进行调整。


但在这个地方,我们遭遇了一个阻碍。因为在原来的作业执行中,上游节点执行是依赖于下游节点的并行度的。具体来说,是因为上游在产出数据时,会根据下游并行度,对数据进行划分(sub-partition);这样,每个下游任务就可以直接消费其对应的那一个数据分区。然而,在动态执行计划的场景下,下游节点的并行度是不确定的

为了解决这个问题,我们改造了节点数据的划分逻辑,使其不再根据下游节点的并行度,而是根据其最大并行度进行划分。同时,我们也改造了节点消费数据的逻辑,使其不再只消费单一分区而是可以消费一组连续的数据分区(sub-partition range)。

通过这样的方式,上游节点执行得以和下游节点的并行度解耦,动态执行拓扑也得以实现。


在支持了动态执行拓扑后,我们引入了 Adaptive Batch Scheduler 来支持自适应执行计划

与原有调度器不同的地方在于,Adaptive Batch Scheduler 会基于动态执行拓扑进行作业管控,持续收集运行时的信息,定制后续的执行计划。Flink 会基于执行计划,动态生成执行节点和连边,以此来更新执行拓扑。


在上述框架下,我们为 Flink 增加了自动决定并行度的能力。用户只需要配置希望单个执行节点处理的数据量, Flink 就可以根据该阶段需要处理的数据量,自动推导该阶段的节点并行度

相比起传统的为每个作业单独配置并行度,自动决定并行度有这些优点:

  1. 一是配置简单,无需为每个作业单独配置,一项配置可以适用于很多作业;
  2. 二是可以自动的适配每天变化的数据量,当数据量较大时,作业并行度可以大一些,从而保障作业的产出时间;
  3. 三是可以细粒度的调整作业的并行度,提高资源利用率。

但是自动决定并行度,数据可能分布不均。为了解决这个问题,我们在自动决定并行度的基础上,进行了自动均衡下发数据的改进。

这个改进会采集 sub-partition 粒度的数据量,并以此来决定执行节点的并行度,以及每个执行节点应该消费哪些分区数据。从而尽可能让下游各执行节点消费的数据,接近用户配置的预期值。

相比起自动决定并行度,这样的方式不但让下游数据量更均衡,而且能够缓解数据倾斜的影响。这个功能正在开发中,会随着 Flink 1.17 发布


以上就是我们当前已经或是即将在 Flink 中完成的自适应执行计划的改进。

不过,自适应执行计划还有更大的改进空间,比如根据 join 算子实际消费的数据量,动态决定应该用 hash join 还是 broadcast join;支持选择性执行任务,在满足特定条件下,为作业加入额外的执行分支;在 Sink 输出结果达标时提前结束作业。


此外,我们也在考虑 SQL 的动态优化能力。

当前,SQL 的查询优化是在作业编译时进行的;其只能通过 Source 的 Meta 信息,对数据量进行估算,容易导致优化结果不准确。如果可以向 SQL planner 反馈运行时信息,来动态的优化执行计划,就可以得到更好的执行效果。

2.同源实例的并行执行


接下来,讲一讲同源实例的并行执行。

同源实例是指,属于同一个执行节点的执行实例。执行拓扑是由执行节点组成,各节点会创建执行实例,将其部署到 TaskManager 上进行执行。

当前,每个执行节点在某一时刻只能有一个执行实例,只有当该实例失败(或被取消)后,节点才会创建一个新的执行实例。这也意味着,同源执行实例只能串行执行

驱动我们更改这一现状的,是来自预测执行的需求。

在生产中,热点机器是无法避免的,混部集群、密集回刷,都可能导致一台机器的负载高、IO 繁忙。其上执行的数据处理任务可能异常缓慢,导致批作业产出时间难以得到保障。


预测执行,是一种已经得到普遍的认可、用来解决这类问题的方法。

其基本思路是,为热点机器上的慢任务创建新的执行实例,并部署在正常的机器节点上这些预测执行实例和对应的原始实例,具有相同的输入和产出。其中,最先完成的实例会被承认,其他相应实例会被取消

因此,为了支持预测执行,Flink 必须支持多个同源实例并行执行。为了支持同源实例并行执行,我们进行了下列改进。

首先,我们重新梳理了执行节点的状态。

当前,执行节点的状态和其当前唯一执行实例是一一对应的。然而,如果一个节点可以同时存在多个执行实例,这样的映射方式就会出现问题。

为此,我们重新定义了执行节点与执行实例的状态映射,取执行实例中最接近 FINISH 状态的状态作为执行节点的状态。这样既可以兼容单执行实例场景,也能支持多个同源实例并行执行的场景。


其次,我们保证了 Source 的同源执行实例,总是会读取到相同的数据。

大体上来说,就是我们在框架层为每个 Source 执行节点增加一个列表,来维护分配给它的数据分片。该节点的所有执行实例都会共享这一个列表,只是会各自维护一个不同的下标,来记录其处理到的数据分片进度

这样的改动的好处是,大部分现有 Source 不需要额外的修改,就可以进行预测执行。只有当 Source 使用了自定义事件的情况下,它们才需要实现一个额外的接口,用以保证各个事件可以被分发给正确的执行实例。


在接下来的 Flink 1.17 中,我们也会支持 Sink 的同源执行实例并行执行。

其关键点在于避免不同 Sink 之间的执行冲突,特别是要避免因此产生的数据不一致,因为 Sink 是需要向外部系统进行写入的。

由于 Sink 的写入逻辑隐藏在各个 Sink 的实现中,我们无法像 Source 一样在框架层统一避免写入冲突。所以我们向 Sink 层暴露了执行实例标识(attemptNumber),使 Sink 可以自行避免冲突

同时为了安全起见,我们默认不会允许 Sink 的同源执行实例并行执行,除非 Sink 显式声明支持同源执行实例并行执行


在此基础上,我们为 Flink引入了预测执行机制。主要包括三个核心组件。

首先是慢任务检测器。它会定期进行检测,综合任务处理的数据量,以及其执行时长,评判任务是否是慢任务。当发现慢任务时,它会通知给批处理调度器

其次是批处理调度器。在收到慢任务通知时,它会通知黑名单处理器,对慢任务所在的机器进行加黑。并且,只要慢任务同源执行的实例数量,没有超过用户配置上限,它会为其拉起并部署新的执行实例。当任意执行实例成功完成时,调度器会取消掉其他的同源执行实例。

最后是黑名单处理器。Flink 可以利用其加黑机器。当机器节点被加黑后,后续的任务不会被部署到该机器。为了支持预测执行,我们支持了软加黑的方式,即加黑机器上已经存在的任务,可以继续执行而不会因为加黑被取消掉


除此之外,工作人员对外部 UI 进行改进,方便展示当前运行中的所有同源执行实例,用户可以更好的判断预测执行的执行结果。

此外,我们对 WebUI 也进行了改进,使其能够展示当前运行中,或是作业结束时的所有同源执行实例,用户可以更好的判断预测执行的执行结果。此外,UI 也能展示被加黑的 Slot 和 TaskManager。

需要说明的是,虽然出发点是支持批作业的预测执行。同源执行实例的并行执行,也为流作业的任务平滑迁移提供了可能。

当流作业有任务落在慢机器上时,我们也可能先为其预先拉起一个同源执行实例,待该实例的部署和初始化完成后,通过直接切换数据连边,可以达成低断流的任务迁移。配合慢任务检测、黑名单等能力,我们甚至能让 Flink 自动的进行慢任务迁移。

以上是关于FlinkFlink运行时的架构的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink SQL 架构 以及 执行逻辑

Flink入门——Flink架构介绍

Flink运行架构-运行组件介绍

flinkFlink 批作业的运行时自适应执行管控

数据湖架构HudiHudi集成Flink案例详解

flink在企业IT架构中如何定位-在选型流批一体技术与大数据架构时的避坑指南