Flink 大规模作业调度性能优化

Posted Apache Flink

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 大规模作业调度性能优化相关的知识,希望对你有一定的参考价值。

摘要:本文作者洪志龙(柏星)& 朱翥(长耕),分享了如何在 Flink 1.13 版本和 1.14 版本中对 Flink 调度大规模作业的性能进行了优化。主要内容包括:

  1. 性能测评结果
  2. 基于拓扑结构的优化
  3. 优化任务部署
  4. 针对 Pipelined Region 构建的优化


Tips:点击「阅读原文」查看 FFA 2021 视频回放~

随着 Flink 流批一体架构不断演进和升级,越来越多的用户开始选择用 Flink 来同时承载实时和离线的业务。离线业务和实时业务有一定差异性,其中比较关键的一点是 —— 离线作业的规模通常都远远大于实时作业。超大规模的流批作业对 Flink 的调度性能提出了新的挑战。在基于 Flink 1.12 版本部署大规模流批作业时,用户可能会遇到以下瓶颈:


  1. 需要很长时间才能完成作业的调度和部署;

  2. 需要大量内存来存储作业的执行拓扑图以及部署时所需的临时变量,并且在运行过程中会出现频繁的长时间 GC,影响集群稳定性;

经测试,对于一个并发度为 10k 的 word count 作业,在其部署时 JobManager 需要 30 GiB 内存,并且从提交作业到所有任务节点部署完毕所需的总时间长达 4 分钟。

此外,对于大规模作业,任务部署的过程可能会长时间阻塞 JobManager 的主线程。当主线程阻塞时,JobManager 无法响应任何来自 TaskManager 的请求。这会使得 TaskManager 心跳超时进而导致作业出错失败。在最坏的情况下,作业从故障恢复 (Failover) 并进行新一轮部署时又会出现心跳超时,从而导致作业一直卡在部署阶段无法正常运行。

为了优化 Flink 调度大规模作业的性能,我们在 Flink 1.13 版本和 1.14 版本进行了以下优化:


  1. 针对拓扑结构引入分组概念,优化与拓扑相关的计算逻辑,主要包括作业初始化、Task 调度以及故障恢复时计算需要重启的 Task 节点等等。与此同时,该优化降低了执行拓扑占用的内存空间;

  2. 引入缓存机制优化任务部署,优化后部署速度更快且所需内存更少;

  3. 基于逻辑拓扑和执行拓扑的特性进行优化以加快 Pipelined Region 的构建速度,从而降低作业初始化所需的时间。

一、性能评测结果


为了评估优化的效果,我们对 Flink 1.12 (优化前) 和 Flink 1.14 (优化后) 进行了对比测试。测试作业包含两个节点,由全连接边相连,并发度均为 10k。为了通过 blob 服务器分发 ShuffleDescriptor,我们将配置项 blob.offload.minsize 的值修改为 100 KiB。该配置项指定了通过 blob 服务器传输数据的最小阈值,大小超过该阈值的数据将会通过 Blob 服务器进行传输。该配置项的默认值为 1 MiB,而测试作业中节点的 ShuffleDescriptor 大小约为 270 KiB。测试结果如表 1 所示:

表 1 Flink 1.12 和 1.14 各流程时间对比


1.12

1.14

时间降低百分比(%)

作业初始化

11,431ms

627ms

94.51%

任务部署

63,118ms

17,183ms

72.78%

故障恢复时计算重启节点

37,195ms

170ms

99.55%


除了时间大幅缩短以外,内存占用也明显降低。在 Flink 1.12 版本上运行测试作业时,JobManager 需要 30 GiB 内存才能保证作业稳定运行,而在 Flink 1.14 版本上只需要 2 GiB 即可。与此同时,GC 情况也得以改善。在 1.12 版本上,测试作业在初始化和 Task 部署的过程中都会出现超过 10 秒的长 GC,而在 1.14 版本上均未出现,这意味着心跳超时等问题出现的概率更低,作业运行更为稳定。

在 1.12 版本上,除去申请资源的时间,测试作业需要至少 4 分钟才能部署完成。而作为对比,在 1.14 版本上,除去申请资源的时间,测试作业在 30 秒内即可完成部署并开始运行。整体所需时间降低了 87%。鉴于此,对于需要部署运行大规模作业的用户,建议将 Flink 版本升级至 1.14 以提升作业调度和部署性能。

在接下来的部分中我们将进一步介绍各项优化的细节。

二、基于拓扑结构的优化


在 Flink 中,分发模式 (Distribution Pattern) 描述了上游节点与下游节点连接的方式,上游节点计算的结果会按照连边分发到下游节点。目前 Flink 中有两种分发模式:点对点 (Pointwise) 和全连接 (All-to-all)。如图 1 所示,当分发模式为点对点时,遍历所有边的计算复杂度为 O(N);当分发模式为全连接时,所有下游节点与上游节点都有连边,遍历所有边的计算复杂度为 O(N2),所需时间会随着规模增大而迅速增长。


图 1 目前 Flink 的两种分发模式


Flink 1.12 版本使用执行拓扑边 (ExecutionEdge) 存储任务节点间连接的信息。当分发模式为全连接模式时,节点间一共会有 O(N2) 条边相连,当作业规模较大时会占用大量内存。对于两个全连接边相连且并发度为 10k 的节点,其连边数量为 1 亿,总共需要超过 4 GiB 内存来存储这些连边。在生产作业中可能会有多个全连接边相连的节点,这也就意味着随着作业规模的增长,所需内存也会大幅增长。

从图 1 可以看到,对于全连接边相连的任务节点,所有上游节点所产生的结果分区 (Result Partition) 都是同构的,也就是说这些结果分区所连接的下游任务节点都是完全相同的。全连接边相连的所有下游节点也都是同构的,因为其所消费的上游分区都是相同的。鉴于节点间的 JobEdge 只有一种分发模式,我们可以按照分发模式对上游分区以及下游节点进行分组。

对于全连接边,由于其所有下游节点都是同构的,我们可以将这些下游节点划分为一组,称为节点组 (ConsumerVertexGroup),全连接边相连的所有上游分区都与这个组连接。同样,所有同构的上游分区也被划分为同一组,称为分区组 (ConsumedPartitionGroup),全连接边相连的所有下游节点都与这个组相连。优化方案的基本思路为:将所有消费相同结果分区的下游节点放入同一个节点组中,同时将所有与相同下游节点相连的结果分区放入同一个分区组中,如图 2 所示。


图 2 两种分发模式下如何对结果分区和任务节点进行分组

在调度任务节点时,Flink 需要遍历每一个上游分区和下游节点间的所有连边。在优化前,由于连边的总数量为 O(N2),因此将所有边遍历一遍的总时间复杂度为 O(N2)。优化后,执行拓扑边被分组的概念所替代。鉴于所有同构的分区都连接到同一个下游节点组,当 Flink 需要遍历所有连边时,只需要将该节点组遍历一遍即可,不需要重复遍历所有节点,这样就使得计算复杂度从 O(N2) 降到 O(N)。

对于点对点的分发模式,上游结果分区与下游节点逐一相连,因此分区组和节点组之间也是点对点相连,分组的数量级和执行拓扑边的数量级是一样的,也就是说,遍历所有连边的计算复杂度依旧是 O(N)。

对于上文我们提到的 word count 作业,采用上述的分组方式取代执行拓扑边可以将执行拓扑的内存占用从 4 GiB 降至 12 MiB 左右。基于分组的概念,我们对作业初始化、任务调度以及故障恢复时计算需要重启的节点等耗时较长的计算逻辑进行了优化。这些计算逻辑都涉及到对上下游之间所有连边进行遍历的操作。在优化后,其计算复杂度都从 O(N2) 降为 O(N)。

三、优化任务部署


对于 Flink 1.12 版本,当大规模作业内包含全连接边时,部署所有节点需要花费很长时间。此外,在部署过程中容易出现 TaskManager 心跳超时的情况,进而导致集群不稳定。

目前任务部署包含以下几个阶段:


  1. JobManager 在主线程内为每一个 Task 创建任务部署描述符 (TaskDeploymentDescriptor,以下简称 TDD);

  2. JobManager 在异步线程内将这些 TDD 进行序列化;

  3. JobManager 通过 RPC 通信将序列化后的 TDD 发送至 TaskManager;

  4. TaskManager 基于 TDD 创建任务并执行。

TDD 包含了 TaskManager 创建任务 (Task) 时所需的所有信息。当任务部署开始时,JobManager 会在主线程内为所有任务节点创建 TDD。在创建过程中 JobManager 无法响应任何其他请求。对于大规模作业,这一过程可能会导致 JobManager 主线程长时间被阻塞,进一步导致心跳超时,从而触发作业故障。

鉴于任务部署时所有 TDD 都是由 JobManager 负责发送至各 TaskManager,这导致 JobManager 可能会成为性能瓶颈。尤其是对于大规模作业,部署时产生的 TDD 会占用大量内存空间,导致频繁的长时间 GC,进一步加重 JobManager 的负担。

因此,我们需要缩短创建 TDD 所需的时间,避免心跳超时的发生。此外,如果能够缩减 TDD 的大小,网络传输所需的时间也会缩短,这样可以进一步加快任务部署的速度。

3.1 为 ShuffleDescriptor 添加缓存机制


ShuffleDescriptor 用于描述任务在运行时需要消费的上游结果分区的所有信息。当作业规模较大时,ShuffleDescriptor 可能是 TDD 中所占空间最大的一部分。对于全连接边相连的节点,当上游节点和下游节点的并发度都是 N 时,每一个下游节点需要消费 N 个上游结果分区,此时 ShuffleDescriptor 的总数量是 N2。也就是说,计算所有节点的 ShuffleDescriptor 的时间复杂度为 O(N2)。

然而,对于同构的下游节点来说,他们所消费的上游结果分区是完全一样的,因此部署时所需要的 ShuffleDescriptor 内容也是一样的。鉴于此,在部署时不需要为每一个下游节点重复计算 ShuffleDescriptor,只需要将计算好的 ShuffleDescriptor 放入缓存以供复用即可。这样计算 TDD 的时间复杂度就可以从 O(N2) 降至 O(N)。

为了缩减 RPC 消息的大小,进而降低网络传输的开销,我们可以对 ShuffleDescriptor 进行压缩。对于上文我们提到的 word count 作业,当节点并发度为 10k 时,每一个下游节点都会有 10k 个 ShuffleDescriptor,在压缩后其序列化值的总大小降低了 72%。

3.2 通过 Blob 服务器分发 ShuffleDescriptor


Blob (Binary Large Object) 以二进制数据的形式存储大型文件。Flink 通过 blob 服务器在 JobManager 和 TaskManager 之间传输体积较大的文件。当 JobManager 需要将大文件传输至 TaskManager 时,它可以将文件传输至 blob 服务器 (同时会将文件传输至分布式文件系统),并且获得访问文件所需的 token。当 TaskManager 获取到 token 时,它们会从分布式文件系统 (Distributed File System,DFS) 下载文件。TaskManager 会同时将文件存储到本地 blob 缓存中方便之后重复读取。

在任务部署的过程中,JobManager 负责将 ShuffleDescriptor 通过 RPC 消息分发到对应的 TaskManager 中。在发送完成后,RPC 消息会被垃圾回收器回收处理。但当 JobManager 创建 RPC 消息的速度大于发送的速度时,RPC 消息会逐渐堆积在内存中并且对 GC 造成影响,频繁触发长时间的 GC。这些 GC 会导致 JobManager 停摆,进一步拖慢任务部署的速度。

为了解决这个问题,Flink 可以通过 blob 服务器来分发大体积的 ShuffleDescriptor。首先 JobManager 将 ShuffleDescriptor 发送至 blob 服务器,而 blob 服务器会将 ShuffleDescriptor 存储至 DFS 中,TaskManager 在开始处理 TDD 时会从 DFS 下载数据。这样 JobManager 不需要将所有 ShuffleDescriptor 始终存储在内存中直至对应的 RPC 消息发出。经过优化后,在部署大规模作业时长时间 GC 的频率会明显降低。且鉴于 DFS 为 TaskManager 提供了多个分布式节点下载数据,JobManager 网络传输的压力也得以缓解,不再成为瓶颈,这样可以加快任务部署的速度。


图 3 JobManager 将 ShuffleDescriptor 分发至 TaskManager

为了避免缓存过多导致本地磁盘空间不足,当 ShuffleDescriptor 所对应的结果分区被释放时,在 blob 服务器上存储的对应缓存会被清理。此外我们为 TaskManager 上 ShuffleDescriptor 的缓存添加了总大小的限制。当缓存超过一定大小时,缓存会按照最近最少使用 (LRU) 的顺序移除。这样可以保证本地磁盘不会被缓存占满,特别是对于 session 模式运行的集群。

四、针对 Pipelined Region 构建的优化


目前 Flink 中节点间有两种数据交换类型:pipelined 和 blocking。对于 blocking 的数据交换方式,结果分区会在上游全部计算完成后再交由下游进行消费,数据会持久化到本地,支持多次消费。对于 pipelined 数据交换,上游结果分区的产出和下游任务节点的消费是同时进行的,所有数据不会被持久化且只能读取一次。

鉴于 pipelined 的数据流产出和消费同时发生,Flink 需要保证 pipelined 边相连的上下游节点同时运行。由 pipelined 边相连的节点构成了一个 region,被称为 Pipelined Region (以下简称 region)。在 Flink 中,region 是任务调度和 Failover 的基本单位。在调度的过程中,同一 region 内的所有 Task 节点都会被同时调度,而整个拓扑中所有 region 会按照拓扑顺序逐一进行调度。

目前在 Flink 的调度层面有两种 region:逻辑层面的 Logical Pipelined Region 以及执行调度层面的 Scheduling Pipelined Region。逻辑 region 由逻辑拓扑 (JobGraph) 中的节点 JobVertex 构成,而执行 region 则由执行拓扑 (ExecutionGraph) 中的节点 ExecutionVertex 构成。类似于 ExecutionVertex 基于 JobVertex 计算产生,执行 region 是由逻辑 region 计算得到的,如图 4 所示。


图 4 逻辑 region 以及执行 region

在构建 region 的过程中会遇到一个问题:region 之间可能存在环形依赖。对于当前 region,当且仅当其所消费的上游 region 都产出全部数据后才能进行调度。如果两个 region 之间存在环形依赖,那么就会出现调度死锁:两个 region 都需要等对方完成才能调度,最终两个 region 都无法被调度起来。因此,Flink 通过 Tarjan 强连通分量算法来发现环形依赖,并将具有环形依赖的 region 合并成一个 region,这样就能解决调度死锁的问题。Tarjan 强连通分量算法需要遍历拓扑内的所有边,而对于全连接的分发模式来说,其边的数量为 O(N2),因此算法整体的计算复杂度为 O(N2),随着规模变大会显著增长,从而影响大规模作业初始化的时间。


图 5 具有调度死锁的拓扑

为了加快 region 的构建速度,我们可以基于逻辑拓扑和执行拓扑之间的关联进行优化。鉴于一个执行 region 只能由一个逻辑 region 中的节点派生,不会出现跨 region 的情况,Flink 在初始化作业时只需要遍历所有逻辑 region 并逐一转换成执行 region 即可。转换的方式跟分发模式相关。如果逻辑 region 内的节点间有任何全连接边,则整个逻辑 region 可以直接转换成一个执行 region。


图 6 如何将逻辑 region 转换成执行 region

如果全连接边采用的是 pipelined 数据交换,所有与之相连的上下游节点都必须同时运行,也就是说全连接边所连接的所有 region 都要合并成一个 region。如果全连接边采用的是 blocking 数据交换,则会引入环形依赖,如图 5 所示。在这种情况下所有与之相连的 region 都必须合并以避免调度死锁,如图 6 所示。鉴于只要有全连接边就直接生成一整个执行 region,在这种情况下不需要用 Tarjan 算法,整体计算复杂度只需要 O(N) 即可。

如果在逻辑 region 内,所有节点间都只有点对点的分发模式,那么 Flink 依旧直接用 Tarjan 算法来检测环形依赖,鉴于点对点的分发模式其边数为 O(N),算法的时间复杂度也只有 O(N)。

在优化后,将逻辑 region 转换成执行 region 的整体计算复杂度从 O(N2) 降为 O(N)。经测试,对于上文提到的 word count 作业,当两个节点间的连边为全连接边且数据交换方式为 blocking 时,构建 region 的总时间降低了 99%,从 8,257ms 降至 120ms。



FFA 2021 视频回放 & 演讲 PDF 获取


关注Apache Flink,回复:FFA2021

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~

   戳我,查看 FFA 2021 视频回放~

美团的Hadoop YARN调度性能优化实践


背景

YARN作为Hadoop的资源管理系统,负责Hadoop集群上计算资源的管理和作业调度。

美团的YARN以社区2.7.1版本为基础构建分支。目前在YARN上支撑离线业务、实时业务以及机器学习业务。

  • 离线业务主要运行的是Hive on MapReduce, Spark SQL为主的数据仓库作业。

  • 实时业务主要运行Spark Streaming,Flink为主的实时流计算作业。

  • 机器学习业务主要运行TensorFlow,MXNet,MLX(美团点评自研的大规模机器学习系统)等计算作业。

YARN面临高可用、扩展性、稳定性的问题很多。其中扩展性上遇到最严重的是集群和业务规模增长带来的调度器性能问题。从业务角度来看,假设集群1000台节点,每个节点提供100个CPU的计算能力。每个任务使用1个CPU,平均执行时间1分钟。集群在高峰期始终有超过10万CPU的资源需求。集群的调度器平均每分钟只能调度5万的任务。从分钟级别观察,集群资源使用率是50000/(100*1000)=0.5,那么集群就有50%的计算资源因为调度能力的问题而无法使用。

随着集群规模扩大以及业务量的增长,集群调度能力会随着压力增加而逐渐下降。假设调度能力依然保持不变,每分钟调度5万个任务,按照5000台节点的规模计算,如果不做任何优化改进,那么集群资源使用率为:50000/(100*5000) = 10%,剩余90%的机器资源便无法被利用起来。

这个问题解决后,集群在有空余资源的情况下,作业资源需求可以快速得到满足,集群的计算资源得到充分地利用。

下文会逐步将Hadoop YARN调度系统的核心模块展开说明,揭开上述性能问题的根本原因,提出系统化的解决方案,最终Hadoop YARN达到支撑单集群万级别节点,支持并发运行数万作业的调度能力。

整体架构

YARN架构

YARN负责作业资源调度,在集群中找到满足业务的资源,帮助作业启动任务,管理作业的生命周期。

YARN详细的架构设计请参考Hadoop官方文档

资源抽象

YARN在CPU,Memory这两个资源维度对集群资源做了抽象。

class Resource{
  int cpu;   //cpu核心个数
  int memory-mb; //内存的MB数
}

作业向YARN申请资源的请求是:List[ResourceRequest]

class ResourceRequest{
  int numContainers; //需要的container个数
  Resource capability;//每个container的资源
}

YARN对作业响应是:List[Container]

class Container{
  ContainerId containerId; //YARN全局唯一的container标示
  Resource capability;  //该container的资源信息
  String nodeHttpAddress; //该container可以启动的NodeManager的hostname
}

YARN调度架构

YARN调度器

名词解释

  • ResourceScheduler是YARN的调度器,负责Container的分配。

  • AsyncDispatcher是单线程的事件分发器,负责向调度器发送调度事件。

  • ResourceTrackerService是资源跟踪服务,主要负责接收处理NodeManager的心跳信息。

  • ApplicationMasterService是作业的RPC服务,主要负责接收处理作业的心跳信息。

  • AppMaster是作业的程序控制器,负责跟YARN交互获取/释放资源。

调度流程

  1. 作业资源申请过程:AppMaster通过心跳告知YARN资源需求(List[ResourceRequest]),并取回上次心跳之后,调度器已经分配好的资源(List[Container])。

  2. 调度器分配资源流程是:Nodemanager心跳触发调度器为该NodeManager分配Container。

资源申请和分配是异步进行的。ResourceScheduler是抽象类,需要自行实现。社区实现了公平调度器(FairScheduler)和容量调度器(CapacityScheduler)。美团点评根据自身业务模式的特点,采用的是公平调度器。

公平调度器

作业的组织方式

在公平调度器中,作业(App)是挂载如下图的树形队列的叶子。

美团的Hadoop YARN调度性能优化实践
队列结构

核心调度流程

美团的Hadoop YARN调度性能优化实践

核心调度流程

  1. 调度器锁住FairScheduler对象,避免核心数据结构冲突。

  2. 调度器选取集群的一个节点(Node),从树形队列的根节点ROOT开始出发,每层队列都会按照公平策略选择一个子队列,最后在叶子队列按照公平策略选择一个App,为这个App在Node上找一块适配的资源。

对于每层队列进行如下流程:

  1. 队列预先检查:检查队列的资源使用量是否已经超过了队列的Quota。

  2. 排序子队列/App:按照公平调度策略,对子队列/App进行排序。

  3. 递归调度子队列/App。

例如,某次调度的路径是ROOT ->  ParentQueueA -> LeafQueueA1 -> App11,这次调度会从Node上给App11分配Container。

伪代码

class FairScheduler{
  /* input:NodeId
   *  output:Resource 表示分配出来的某个app的一个container的资源量
   *  root 是树形队列Queue的根
   */

  synchronized Resource attemptScheduling(NodeId node){
    root.assignContainer(NodeId); 
  }
}

class Queue{
  Resource assignContainer(NodeId node){
    if(! preCheck(node) ) return;  //预先检查
      sort(this.children);  //排序
    if(this.isParent){
      for(Queue q: this.children)
        q.assignContainer(node);  //递归调用
    }else{
      for(App app: this.runnableApps)
        app.assignContainer(node); 
    }
  }
}

class App{
  Resource assignContainer(NodeId node){
    ......
  }
}

公平调度器架构

公平调度器是一个多线程异步协作的架构,而为了保证调度过程中数据的一致性,在主要的流程中加入了FairScheduler对象锁。其中核心调度流程是单线程执行的。这意味着Container分配是串行的,这是调度器存在性能瓶颈的核心原因。

美团的Hadoop YARN调度性能优化实践

公平调度器架构

  • Scheduler Lock:FairScheduler对象锁。

  • AllocationFileLoaderService:负责公平策略配置文件的热加载,更新队列数据结构。

  • Continuous Scheduling Thread:核心调度线程,不停地执行上节的核心调度流程。

  • Update Thread:更新队列资源需求,执行Container抢占流程等。

  • Scheduler Event Dispatcher Thread: 调度器事件的处理器,处理App新增,App结束,Node新增,Node移除等事件。

性能评估

上文介绍了公平调度器的架构,在大规模的业务压力下,这个系统存在性能问题。从应用层的表现看,作业资源需求得不到满足。从系统模块看,多个模块协同工作,每个模块多多少少都存在性能问题。如何评估系统性能已经可以满足线上业务的需求?如何评估系统的业务承载能力?我们需要找到一个系统的性能目标。因此在谈性能优化方案之前,需要先说一说调度系统性能评估方法。

一般来说,在线业务系统的性能是用该系统能够承载的QPS和响应的TP99延迟时间来评估,而调度系统与在线业务系统不同的是:调度系统的性能不能用RPC(ResourceManager接收NodeManager和AppMaster的RPC请求)的响应延迟来评估。原因是:这些RPC调用过程跟调度系统的调度过程是异步的,因此不论调度性能多么差,RPC响应几乎不受影响。同理,不论RPC响应多么差,调度性能也几乎不受影响。

业务指标:有效调度

首先从满足业务需求角度分析调度系统的业务指标。调度系统的业务目标是满足业务资源需求。指标是:有效调度(validSchedule)。在生产环境,只要validSchedule达标,我们就认为目前调度器是满足线上业务需求的。

定义validSchedulePerMin表示某一分钟的调度性能达标的情况。达标值为1,不达标值为0。

validPending = min(queuePending, QueueMaxQuota)
if  (usage / total  > 90% || validPending == 0):   validSchedulePerMin = 1 //集群资源使用率高于90%,或者集群有效资源需求为0,这时调度器性能达标。
if (validPending > 0 &&  usage / total < 90%) : validSchedulePerMin = 0;//集群资源使用率低于90%,并且集群存在有效资源需求,这时调度器性能不达标。
  • validPending表示集群中作业有效的资源需求量。

  • queuePending表示队列中所有作业的资源需求量。

  • QueueMaxQuota表示该队列资源最大限额。

  • Usage表示集群已经使用的资源量。

  • Total表示集群总体资源。

设置90%的原因是:资源池中的每个节点可能都有一小部分资源因为无法满足任何的资源需求,出现的资源碎片问题。这个问题类似Linux内存的碎片问题。由于离线作业的任务执行时间非常短,资源很快可以得到回收。在离线计算场景,调度效率的重要性远远大于更精确地管理集群资源碎片,因此离线调度策略暂时没有考虑资源碎片的问题。

validSchedulePerDay表示调度性能每天的达标率。
validSchedulePerDay = ΣvalidSchedulePerMin /1440  

目前线上业务规模下,业务指标如下:
validSchedulePerMin > 0.9; validSchedulePerDay > 0.99

系统性能指标:每秒调度Container数

调度系统的本质是为作业分配Container,因此提出调度系统性能指标CPS--每秒调度Container数。

在生产环境,只要validSchedule达标,表明目前调度器是满足线上业务需求的。而在测试环境,需要关注不同压力条件下的CPS,找到当前系统承载能力的上限,并进一步指导性能优化工作。

CPS与测试压力相关,测试压力越大,CPS可能越低。从上文公平调度器的架构可以看到,CPS跟如下信息相关:

  • 集群总体资源数:集群资源越多,集群可以并发运行的的Container越多,对调度系统产生越大的调度压力。目前每台物理机的CPU、Memory资源量差距不大,因此集群总体资源数主要看集群的物理机节点个数。

  • 集群中正在运行的App数:作业数越多,需要调度的信息越多,调度压力越大。

  • 集群中的队列个数:队列数越多,需要调度的信息越多,调度压力越大。

  • 集群中每个任务的执行时间:任务执行时间越短会导致资源释放越快,那么动态产生的空闲资源越多,对调度系统产生的压力越大。

例如,集群1000个节点,同时运行1000个App,这些App分布在500个Queue上,每个App的每个Container执行时间是1分钟。在这样的压力条件下,调度系统在有大量资源需求的情况下,每秒可以调度1000个Container。那么在这个条件下,调度系统的CPS是1000/s。

调度压力模拟器

在线上环境中,我们可以通过观察上文提到的调度系统的指标来看当前调度性能是否满足业务需求。但我们做了一个性能优化策略,不能直接到在线上环境去试验,因此我们必须有能力在线下环境验证调度器的性能是满足业务需求的,之后才能把试验有效的优化策略推广到线上环境。

那我们在线下也搭建一套跟线上规模一样的集群,是否就可以进行调度器性能优化的分析和研究呢?理论上是可以的,但这需要大量的物理机资源,对公司来说是个巨大的成本。因此我们需要一个调度器的压力模拟器,在不需要大量物理机资源的条件下,能够模拟YARN的调度过程。

社区提供了开源调度器的压力模拟工具——Scheduler Load Simulater(SLS)。

美团的Hadoop YARN调度性能优化实践
调度压力模拟器

如上图,左侧是开源SLS的架构图,整体都在一个进程中,ResourceManager模块里面有一个用线程模拟的Scheduler。App和NM(NodeManager)都是由线程模拟。作业资源申请和NM节点心跳采用方法调用。

开源架构存在的问题有:

  • 模拟大规模APP和NM需要开启大量的线程,导致调度器线程和NM/App的模拟线程争抢cpu资源,影响调度器的评估。

  • SLS的Scheduler Wapper中加入了不合理的逻辑,严重影响调度器的性能。

  • SLS为了通用性考虑,没有侵入FairScheduler的调度过程获取性能指标,仅仅从外围获取了Queue资源需求,Queue资源使用量,App资源需求,App资源使用量等指标。这些指标都不是性能指标,无法利用这些指标分析系统性能瓶颈。

针对存在的问题,我们进行了架构改造。右侧是改造后的架构图,从SLS中剥离Scheduler Wapper的模拟逻辑,用真实的ResourceManager代替。SLS仅仅负责模拟作业的资源申请和节点的心跳汇报。ResourceManager是真实的,线上生产环境和线下压测环境暴露的指标是完全一样的,因此线上线下可以很直观地进行指标对比。详细代码参考:YARN-7672

细粒度监控指标

利用调度压力模拟器进行压测,观察到validSchedule不达标,但依然不清楚性能瓶颈到底在哪里。因此需要细粒度指标来确定性能的瓶颈点。由于调度过程是单线程的,因此细粒度指标获取的手段是侵入FairScheduler,在调度流程中采集关键函数每分钟的时间消耗。目标是找到花费时间占比最多的函数,从而定位系统瓶颈。例如:在preCheck函数的前后加入时间统计,就可以收集到调度过程中preCheck消耗的时间。

基于以上的思路,我们定义了10多个细粒度指标,比较关键的指标有:

  • 每分钟父队列preCheck时间。

  • 每分钟父队列排序时间。

  • 每分钟子队列preCheck时间。

  • 每分钟子队列排序时间。

  • 每分钟为作业分配资源的时间。

  • 每分钟因为作业无资源需求而花费的时间。

关键优化点

第一次做压测,给定的压力就是当时线上生产环境峰值的压力情况(1000节点、1000作业并发、500队列、单Container执行时间40秒)。经过优化后,调度器性能提升,满足业务需求,之后通过预估业务规模增长来调整测试压力,反复迭代地进行优化工作。

下图是性能优化时间线,纵轴为调度性能CPS。

美团的Hadoop YARN调度性能优化实践
性能优化时间线

优化排序比较函数

在核心调度流程中,第2步是排序子队列。观察细粒度指标,可以很清楚地看到每分钟调度流程总共用时50秒,其中排序时间占用了30秒,占了最大比例,因此首先考虑优化排序时间。

排序本身用的快速排序算法,已经没有优化空间。进一步分析排序比较函数,发现排序比较函数的时间复杂度非常高。

计算复杂度最高的部分是:需要获取队列/作业的资源使用情况(resourceUsage)。原算法中,每2个队列进行比较,需要获取resourceUsage的时候,程序都是现场计算。计算方式是递归累加该队列下所有作业的resourceUsage。这造成了巨大的重复计算量。

优化策略:将现场计算优化为提前计算。

提前计算算法:当为某个App分配了一个Container(资源量定义为containerResource),那么递归调整父队列的resourceUsage,让父队列的resourceUsage += containerResource。当释放某个App的一个Container,同样的道理,让父队列resourceUsage -= containerResource。利用提前计算算法,队列resourceUsage的统计时间复杂度降低到O(1)。

优化效果:排序相关的细粒度指标耗时明显下降。

美团的Hadoop YARN调度性能优化实践
优化排序比较函数效果

红框中的指标表示每分钟调度器用来做队列/作业排序的时间。从图中可以看出,经过优化,排序时间从每分钟30G(30秒)下降到5G(5秒)以内。详细代码参考:YARN-5969

优化作业跳过时间

从上图看,优化排序比较函数后,蓝色的线有明显的增加,从2秒增加到了20秒。这条蓝线指标含义是每分钟调度器跳过没有资源需求的作业花费的时间。从时间占比角度来看,目前优化目标是减少这条蓝线的时间。

分析代码发现,所有队列/作业都会参与调度。但其实很多队列/作业根本没有资源需求,并不需要参与调度。因此优化策略是:在排序之前,从队列的Children中剔除掉没有资源需求的队列/作业。

优化效果:这个指标从20秒下降到几乎可以忽略不计。详细代码参考:YARN-3547

美团的Hadoop YARN调度性能优化实践
优化作业跳过时间

这时,从上图中可以明显看到有一条线呈现上升趋势,并且这个指标占了整个调度时间的最大比例。这条线对应的指标含义是确定要调度的作业后,调度器为这个作业分配出一个Container花费的时间。这部分逻辑平均执行一次的时间在0.02ms以内,并且不会随着集群规模、作业规模的增加而增加,因此暂时不做进一步优化。

队列并行排序优化

从核心调度流程可以看出,分配每一个Container,都需要进行队列的排序。排序的时间会随着业务规模增加(作业数、队列数的增加)而线性增加。

架构思考:对于公平调度器来说,排序是为了实现公平的调度策略,但资源需求是时时刻刻变化的,每次变化,都会引起作业资源使用的不公平。即使分配每一个Container时都进行排序,也无法在整个时间轴上达成公平策略。

例如,集群有10个CPU,T1时刻,集群只有一个作业App1在运行,申请了10个CPU,那么集群会把这10个CPU都分配给App1。T2时刻(T2 > T1,集群中新来一个作业App2,这时集群已经没有资源了,因此无法为App2分配资源。这时集群中App1和App2对资源的使用是不公平的。从这个例子看,仅仅通过调度的分配算法是无法在时间轴上实现公平调度。

目前公平调度器的公平策略是保证集群在某一时刻资源调度的公平。在整个时间轴上是需要抢占策略来补充达到公平的目标。因此从时间轴的角度考虑,没有必要在分配每一个Container时都进行排序。

综上分析,优化策略是排序过程与调度过程并行化。要点如下:

  1. 调度过程不再进行排序的步骤。

  2. 独立的线程池处理所有队列的排序,其中每个线程处理一个队列的排序。

  3. 排序之前,通过深度克隆队列/作业中用于排序部分的信息,保证排序过程中队列/作业的数据结构不变。

美团的Hadoop YARN调度性能优化实践
并行排序优化

优化效果如下:

队列排序效率:利用线程池对2000个队列进行一次排序只需要5毫秒以内(2ms-5ms),在一秒内至少可以完成200次排序,对业务完全没有影响。

在并行运行1万作业,集群1.2万的节点,队列个数2000,单Container执行时间40秒的压力下,调度CPS达到5万,在一分钟内可以将整个集群资源打满,并持续打满。

美团的Hadoop YARN调度性能优化实践
并发作业数
美团的Hadoop YARN调度性能优化实践
作业资源需求量
美团的Hadoop YARN调度性能优化实践
集群资源使用率

上图中,15:26分,Pending值是0,表示这时集群目前所有的资源需求已经被调度完成。15:27分,resourceUsage达到1.0,表示集群资源使用率为100%,集群没有空闲资源。Pending值达到4M(400万 mb的内存需求)是因为没有空闲资源导致的资源等待。

稳定上线的策略

线下压测的结果非常好,最终要上到线上才能达成业务目标。然而稳定上线是有难度的,原因:

  • 线上环境和线下压测环境中的业务差别非常大。线下没问题,上线不一定没问题。

  • 当时YARN集群只有一个,那么调度器也只有一个,如果调度器出现异常,是整个集群的灾难,导致整个集群不可用。 

除了常规的单元测试、功能测试、压力测试、设置报警指标之外,我们根据业务场景提出了针对集群调度系统的上线策略。

在线回滚策略

这里的关键问题是:系统通过配置加载线程更新了调度器某个参数的值,而调度线程也同时在按照这个参数值进行工作。在一次调度过程中可能多次查看这个参数的值,并且根据参数值来执行相应的逻辑。调度线程在一次调度过程中观察到的参数值发生变化,就会导致系统异常。

处理办法是通过复制资源的方式,避免多线程共享资源引起数据不一致的问题。调度线程在每次调度开始阶段,先将当前所有性能优化参数进行复制,确保在本次调度过程中观察到的参数不会变更。

数据自动校验策略

优化算法是为了提升性能,但要注意不能影响算法的输出结果,确保算法正确性。对于复杂的算法优化,确保算法正确性是一个很有难度的工作。

在“优化排序比较时间”的研发中,变更了队列resourceUsage的计算方法,从现场计算变更为提前计算。那么如何保证优化后算法计算出来的resourceUsage是正确的呢?

即使做了单元策略,功能测试,压力测试,但面对一个复杂系统,依然不能有100%的把握。另外,未来系统升级也可能引起这部分功能的Bug。

算法变更后,如果新的resourceUsage计算错误,那么就会导致调度策略一直错误执行下去。从而影响队列的资源分配。会对业务产生巨大的影响。例如,业务拿不到原本的资源量,导致业务延迟。

通过原先现场计算的方式得到的所有队列的resourceUsage一定是正确的,定义为oldResourceUsage。算法优化后,通过提前计算的方式得到所有队列的resourceUsage,定义为newResourceUsage。

在系统中,定期对oldResourceUsage和newResourceUsage进行比较,如果发现数据不一致,说明优化的算法有Bug,newResourceUsage计算错误。这时系统会向RD发送报警通知,同时自动地将所有计算错误的数据用正确的数据替换,使得错误得到及时自动修正。

总结与未来展望

本文主要介绍了美团点评Hadoop YARN集群公平调度器的性能优化实践。

  1. 做性能优化,首先要定义宏观的性能指标,从而能够评估系统的性能。

  2. 定义压测需要观察的细粒度指标,才能清晰看到系统的瓶颈。

  3. 工欲善其事,必先利其器。高效的压力测试工具是性能优化必备的利器。

  4. 优化算法的思路主要有:降低算法时间复杂度;减少重复计算和不必要的计算;并行化。

  5. 性能优化是永无止境的,要根据真实业务来合理预估业务压力,逐步开展性能优化的工作。

  6. 代码上线需谨慎,做好防御方案。

单个YARN集群调度器的性能优化总是有限的,目前我们可以支持1万节点的集群规模,那么未来10万,100万的节点我们如何应对?

我们的解决思路是:基于社区的思路,设计适合美团点评的业务场景的技术方案。社区Hadoop 3.0研发了Global Scheduling,完全颠覆了目前YARN调度器的架构,可以极大提高单集群调度性能。我们正在跟进这个Feature。社区的YARN Federation已经逐步完善。该架构可以支撑多个YARN集群对外提供统一的集群计算服务,由于每个YARN集群都有自己的调度器,这相当于横向扩展了调度器的个数,从而提高集群整体的调度能力。我们基于社区的架构,结合美团点评的业务场景,正在不断地完善美团点评的YARN Federation。

作者简介

世龙、廷稳美团用户平台大数据与算法部研发工程师。

About团队

数据平台资源调度团队隶属美团用户平台大数据与算法部,目标是建设超大规模、高性能、支持异构计算资源和多场景的资源调度系统。目前管理的计算节点接近 3 万台,在单集群节点过万的规模下实现了单日数十万离线计算作业的高效调度,资源利用率超过 90%。资源调度系统同时实现了对实时计算作业、机器学习模型 Serving 服务等高可用场景的支持,可用性超过 99.9%。系统也提供了对 CPU/GPU 等异构资源的调度支持,实现了数千张 GPU卡的高效调度,以及 CPU 资源的离线与训练混合调度,目前正在引入 NPU/FPGA 等更多异构资源,针对机器学习场景的特点实现更高效合理的调度策略。

----------  END  ----------




以上是关于Flink 大规模作业调度性能优化的主要内容,如果未能解决你的问题,请参考以下文章

Flink 1.13,面向流批一体的运行时与 DataStream API 优化

Flink 1.13,面向流批一体的运行时与 DataStream API 优化

美团的Hadoop YARN调度性能优化实践

走近伏羲,谈5000节点集群调度与性能优化

FlinkFlink 部署性能优化

Flink内幕-作业调度--flink1.13