Flink运行架构

Posted 一只懒得睁眼的猫

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink运行架构相关的知识,希望对你有一定的参考价值。

运行架构和核心组件
Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManager)、

任务管理器(TaskManager),以及分发器(Dispatcher)。因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的

职责如下:

任务提交流程:

1、首先用户通过flink-run命令提交应用程序,向资源管理器Cluster-Manager(Standalone、YARN、k8s)发送请求,申请启动ApplicationMaster;
2、RS收到请求之后,选择一台NM节点启动ApplicationMaster;
3、随后ApplicationMaster向资源管理器YARN进行注册并申请资源TaskManager;
4、随后资源管理器与工作节点进行通信,要求它们分配相应的container资源,并启动TaskManager;
5、TaskManager启动之后会反向注册给ApplicationMaster并申请task;
6、最后JobManager会将Task发送给TaskManager进行执行,并进行监控。

需要说明的是:ApplicationMaster部分内部包含了三个重要的组件:分发器(Dispatcher)、作业管理器(JobManager)、资源管理器(ResourceManager):
分|转发器(Dispatcher):负责接受用户提交的应用程序(作业),并启动JobManager;
资源管理器(ResourceManager):在ApplicationMaster当中负责资源的管理;
作业管理器(JobManager|Driver):负责管理作业的执行和调度,向ResourceManager申请Slot资源,根据JobGraph生成ExecutionGraph、,同时负责状态
容错、checkpoint等操作。

核心组件:

其他组件:

核心概念-TaskManager与Slots
Task Slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;

 Flink中的每一个TaskManager都是一个JVM进程,并会以独立的线程来执行一个或者多个subtask。为了控制一个 TaskManager 能够接收多少个task,Flink

提出了 Task Slot的概念:TaskManager通过task slot来进行控制(一个worker至少有一个task slot)。

简单的说,TaskManager会将自己节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源,但是需要主要的是,

Slot只会做内存的隔离,没有做CPU的隔离:假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot,但是不会涉及到CPU的隔离,

假设我们将 taskmanager.numberOfTaskSlots 这个参数配置为3,那么每一个 taskmanager 中将被分配3个 TaskSlot, 3个 taskmanager 一共有9个TaskSlot:

TaskManager–>Executor,Slot—>Core!

核心概念-并行度(Parallelism)

Flink程序在执行的过程当中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器

中彼此互不依赖地执行。

在实际的生产环境中,一个任务的并行度设置可以从4个层次指定:

a、Operator Level(算子层次)

b、Execution Environment Level(执行环境层次)

c、Client Level(客户端层次)

d、System Level(系统层次)

这些并行度的优先级为Operator Level > Execution Environment Level > Client Level > System Level.

  1. 算子层次设置并行度

Flink支持针对单独的某个Operator设置并行度,一个算子、数据源和sink的并行度可以通过调用setParallelism()方法来单独指定.

  1. 执行环境层次设置并行度

执行环境层次的默认并行度可以通过调用env.setParallelism()方法来指定,这样设置的并行度是针对本Job的,假设我们想以并行度等于3来执行所有的算子、数据源

和sink,可以通过如下的方式进行设置:

  1. 客户端层次设置并行度

我们可以在客户端提交Flink job的时候指定任务的并行度,对于CLI客户端,我们可以通过-p参数来指定并行度.

flink run -p 10 WordCount.jar

  1. 系统层次设置并行度

在系统级可以通过修改flink-conf.yaml文件中的parallelism.default参数来设置全局的默认并行度,即所有执行环境的默认并行度。

并行度和Slot的几个问题

  1. slot和任务的关系是什么?一段代码执行需要多少个slot来执行?

同一个算子的并行任务,必须执行在不同的slot上
不同算子的任务,可以共享一个slot
一段代码执行需要的slot数量,就是代码中并行度最大的算子的并行度的数量。

不同任务的子任务(前提是它们来自同一个job)可以共享slot, 这样的结果是一个slot可以保存作业的整个管道。

  1. 并行度(Parallelism)和 slot 数量的关系?
    并行度:动态概念,并行度和任务有关,每一个算子拥有的并行任务的数量。

slot:静态概念,slot数量只跟TaskManager的配置有关,代表TaskManager并行处理数据的能力。

  1. 假设我们的slot一共只有9个,但是某个算子的并行度设置的是10个,会怎么样?
    应用程序不会执行,而是一直在等待中。。。。

也就是说,在整个程序当中,代码中并行度最大的算子的并行度parallelism应该 <= taskmanager.numberOfTaskSlots * TM,否则会一直等

待资源中。

核心概念-算子之间的数据传输策略
Stream在算子之间传输数据的形式可以是one-to-one(forward strategy)的模式,也可以是redistributing(重分区)的模式。

One-to-one

  1. 上游一个task的输出只发送给下游一个task作为输入;

类似于spark当中的窄依赖
Redistributing

  1. 一个 task 的输出作为多个task的输入;

例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似

于Spark中的shuffle过程。

类似于spark中的宽依赖,严格来讲Redistributing的范围大于宽依赖,如rebalance虽然是重分区,但是不一定发生shuffle.
总结:在Flink当中,数据传输策略有很多,具体见:https://www.cnblogs.com/sjfxwj/p/13752889.html,在这些策略当中,

除了one-to-one(forward strategy策略),其余都可以归纳为:Redistributing。

核心概念-Task和subTask

核心概念-任务链(Operator Chains)
不同算子的subtask如何才能串在一起形成Operator Chains?

对于相同并行度的one to one操作,Flink会将这样相连的算子链接在一起形成一个subtask,原来的算子成为里面的一部分。将算子链接在一起是

非常有效的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

在满足条件的情况下,如何禁用Operator Chains呢?

局部方式:

DataStream<Tuple2<String, Long>> sumData = windowCounts.keyBy(0).sum(1).setParallelism(2).disableChaining();
全局方式:

    // 获取Flink的运行环境.
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //全局禁用任务链
    env.disableOperatorChaining();

禁用示意图:

核心概念-执行图(ExecutionGraph)
看我:https://blog.csdn.net/u010271601/article/details/105169076

由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink

需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。

JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一

起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据

结构。

物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据

结构。

StreamGraph和JobGraph都是在客户端生成的,给JobManager用,ExecutionGraph是在JobManager当中生成的,给TM用!

任务提交流程

Flink任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并

通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager,之后

ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知

资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager,TaskManager启动

后向JobManager发送心跳包,并等待JobManager向其分配任务。

概念总结
简述flink-conf.yaml文件中几个重要的参数

jobmanager.heap.size:JobManager节点可用的内存大小

taskmanager.heap.size:TaskManager节点可用的内存大小

taskmanager.numberOfTaskSlots:每台机器可用的cpu数量

假设我们将taskmanager.numberOfTaskSlots这个参数配置为3,那么每一个taskmanager将被分配3个TaskSlot,3个 taskmanager一共有9个TaskSlot。

parallelism.default:默认情况下Flink任务的并行度

假设我们将parallelism.default 这个参数设置为1,那么9个 TaskSlot 只能用1个,有8个空闲。

简述Flink中Slot参数的概念

Slot是静态的概念,是指TaskManager具有的并发执行能力,

TaskManager是一个JVM进程,并会以独立的线程来执行一个task或者多个subtask。为了控制一个TaskManager能够接受多少个task,Flink提出了Task Slot的概念。简单来说,TaskManager会将节点上管理的资源分为不同的Slot:固定大小的资源子集。这样就避免了不同Job的Task互相竞争内存资源的问题,但是需要注意的是,Slot只会做内存的隔离,没有做CPU的隔离。

简述Flink中parallelism参数的概念

parallelism参数是动态的概念,是指taskmanager实际使用的并发能力,即Flink程序运行时实际使用的并发能力,设置合适的parallelism参数能提高运算效率。

简述在Flink当中并行度的概念?

在Flink当中,某个特定算子的并行实例(线程)的数量被称之为该算子的并行度(parallelism),在同一个程序当中,不同的算子可能具有不同的并行度。

简述Flink当中设置任务并行度的方式以及优先级?

在Flink当中有四种设置任务并行度的方式,分别是:算子层次、执行环境层次、客户端层次和系统层次来设置任务的并行度。

算子层次: 通过调用setParallelism()方法来单独指定某个算子、数据源或者sink的并行度;

执行环境层次: 通过调用env.setParallelism()方法来指定执行环境层次的默认并行度;

客户端层次: 在向Flink提交作业时,通过调用-p参数来指定客户端层次的并行度,如:flink run -p 10 WordCount.jar;

系统层次: 通过修改flink-conf.yaml文件中的parallelism.default参数来设置系统层次的全局并行度;

这四种设置任务并行度的方式优先级分别为: 算子层次、执行环境层次、客户端层次、系统层次。

简述Flink流处理当中的分区策略

分区策略是用来决定数据是如何被发送至下游的,在Flink当中常见的分区策略包括:

ShufflePartitioner:数据会被随机的分发到下游算子当中进行处理;

RebalancePartitioner:数据会被(轮询)均匀的分发到下游算子当中进行处理,从而实现负载均衡,解决数据倾斜;

RescalePartitioner:数据会根据上下游算子的并行度,被均匀的分发到下游算子当中进行处理;

BroadcastPartitioner:数据会被(分发)广播到下游算子当中的每个实例进行处理,适合于大数据集和小数据集做Join的场景。

KeyGroupStreamPartitioner:Hash分区器,分区字段相同的记录信息将被发送到下游的同一个task当中进行处理,所以如果数据分布不均匀,某一个key的条数比其它key多很多,将会导致数据大量的集中到某一个task节点,造成数据倾斜。

简述RescalePartitioner与RebalancePartitioner分区器的区别

RebalancePartitioner会产生全量重分区,而RescalePartitioner不会。

以上是关于Flink运行架构的主要内容,如果未能解决你的问题,请参考以下文章

大数据flink系列第二话(flink架构)

Apache Flink快速入门-基本架构、核心概念和运行流程

Flink入门——Flink架构介绍

Flink运行架构剖析

Flink5:Flink运行架构(Slot和并行度)

Flink运行架构