Flink 大规模作业调度性能优化
Posted Apache Flink
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 大规模作业调度性能优化相关的知识,希望对你有一定的参考价值。
性能测评结果 基于拓扑结构的优化 优化任务部署 针对 Pipelined Region 构建的优化
需要很长时间才能完成作业的调度和部署; 需要大量内存来存储作业的执行拓扑图以及部署时所需的临时变量,并且在运行过程中会出现频繁的长时间 GC,影响集群稳定性;
为了优化 Flink 调度大规模作业的性能,我们在 Flink 1.13 版本和 1.14 版本进行了以下优化:
针对拓扑结构引入分组概念,优化与拓扑相关的计算逻辑,主要包括作业初始化、Task 调度以及故障恢复时计算需要重启的 Task 节点等等。与此同时,该优化降低了执行拓扑占用的内存空间; 引入缓存机制优化任务部署,优化后部署速度更快且所需内存更少; 基于逻辑拓扑和执行拓扑的特性进行优化以加快 Pipelined Region 的构建速度,从而降低作业初始化所需的时间。
一、性能评测结果
1.12 | 1.14 | 时间降低百分比(%) | |
作业初始化 | 11,431ms | 627ms | 94.51% |
任务部署 | 63,118ms | 17,183ms | 72.78% |
故障恢复时计算重启节点 | 37,195ms | 170ms | 99.55% |
二、基于拓扑结构的优化
图 1 目前 Flink 的两种分发模式
三、优化任务部署
目前任务部署包含以下几个阶段:
JobManager 在主线程内为每一个 Task 创建任务部署描述符 (TaskDeploymentDescriptor,以下简称 TDD); JobManager 在异步线程内将这些 TDD 进行序列化; JobManager 通过 RPC 通信将序列化后的 TDD 发送至 TaskManager; TaskManager 基于 TDD 创建任务并执行。
四、针对 Pipelined Region 构建的优化
FFA 2021 视频回放 & 演讲 PDF 获取
关注「Apache Flink」,回复:FFA2021
戳我,查看 FFA 2021 视频回放~
美团的Hadoop YARN调度性能优化实践
背景
YARN作为Hadoop的资源管理系统,负责Hadoop集群上计算资源的管理和作业调度。
美团的YARN以社区2.7.1版本为基础构建分支。目前在YARN上支撑离线业务、实时业务以及机器学习业务。
离线业务主要运行的是Hive on MapReduce, Spark SQL为主的数据仓库作业。
实时业务主要运行Spark Streaming,Flink为主的实时流计算作业。
机器学习业务主要运行TensorFlow,MXNet,MLX(美团点评自研的大规模机器学习系统)等计算作业。
YARN面临高可用、扩展性、稳定性的问题很多。其中扩展性上遇到最严重的是集群和业务规模增长带来的调度器性能问题。从业务角度来看,假设集群1000台节点,每个节点提供100个CPU的计算能力。每个任务使用1个CPU,平均执行时间1分钟。集群在高峰期始终有超过10万CPU的资源需求。集群的调度器平均每分钟只能调度5万的任务。从分钟级别观察,集群资源使用率是50000/(100*1000)=0.5,那么集群就有50%的计算资源因为调度能力的问题而无法使用。
随着集群规模扩大以及业务量的增长,集群调度能力会随着压力增加而逐渐下降。假设调度能力依然保持不变,每分钟调度5万个任务,按照5000台节点的规模计算,如果不做任何优化改进,那么集群资源使用率为:50000/(100*5000) = 10%,剩余90%的机器资源便无法被利用起来。
这个问题解决后,集群在有空余资源的情况下,作业资源需求可以快速得到满足,集群的计算资源得到充分地利用。
下文会逐步将Hadoop YARN调度系统的核心模块展开说明,揭开上述性能问题的根本原因,提出系统化的解决方案,最终Hadoop YARN达到支撑单集群万级别节点,支持并发运行数万作业的调度能力。
整体架构
YARN架构
YARN负责作业资源调度,在集群中找到满足业务的资源,帮助作业启动任务,管理作业的生命周期。
YARN详细的架构设计请参考Hadoop官方文档。
资源抽象
YARN在CPU,Memory这两个资源维度对集群资源做了抽象。
class Resource{
int cpu; //cpu核心个数
int memory-mb; //内存的MB数
}
作业向YARN申请资源的请求是:List[ResourceRequest]
class ResourceRequest{
int numContainers; //需要的container个数
Resource capability;//每个container的资源
}
YARN对作业响应是:List[Container]
class Container{
ContainerId containerId; //YARN全局唯一的container标示
Resource capability; //该container的资源信息
String nodeHttpAddress; //该container可以启动的NodeManager的hostname
}
YARN调度架构
名词解释
ResourceScheduler是YARN的调度器,负责Container的分配。
AsyncDispatcher是单线程的事件分发器,负责向调度器发送调度事件。
ResourceTrackerService是资源跟踪服务,主要负责接收处理NodeManager的心跳信息。
ApplicationMasterService是作业的RPC服务,主要负责接收处理作业的心跳信息。
AppMaster是作业的程序控制器,负责跟YARN交互获取/释放资源。
调度流程
作业资源申请过程:AppMaster通过心跳告知YARN资源需求(List[ResourceRequest]),并取回上次心跳之后,调度器已经分配好的资源(List[Container])。
调度器分配资源流程是:Nodemanager心跳触发调度器为该NodeManager分配Container。
资源申请和分配是异步进行的。ResourceScheduler是抽象类,需要自行实现。社区实现了公平调度器(FairScheduler)和容量调度器(CapacityScheduler)。美团点评根据自身业务模式的特点,采用的是公平调度器。
公平调度器
作业的组织方式
在公平调度器中,作业(App)是挂载如下图的树形队列的叶子。
核心调度流程
调度器锁住FairScheduler对象,避免核心数据结构冲突。
调度器选取集群的一个节点(Node),从树形队列的根节点ROOT开始出发,每层队列都会按照公平策略选择一个子队列,最后在叶子队列按照公平策略选择一个App,为这个App在Node上找一块适配的资源。
对于每层队列进行如下流程:
队列预先检查:检查队列的资源使用量是否已经超过了队列的Quota。
排序子队列/App:按照公平调度策略,对子队列/App进行排序。
递归调度子队列/App。
例如,某次调度的路径是ROOT -> ParentQueueA -> LeafQueueA1 -> App11,这次调度会从Node上给App11分配Container。
伪代码
class FairScheduler{
/* input:NodeId
* output:Resource 表示分配出来的某个app的一个container的资源量
* root 是树形队列Queue的根
*/
synchronized Resource attemptScheduling(NodeId node){
root.assignContainer(NodeId);
}
}
class Queue{
Resource assignContainer(NodeId node){
if(! preCheck(node) ) return; //预先检查
sort(this.children); //排序
if(this.isParent){
for(Queue q: this.children)
q.assignContainer(node); //递归调用
}else{
for(App app: this.runnableApps)
app.assignContainer(node);
}
}
}
class App{
Resource assignContainer(NodeId node){
......
}
}
公平调度器架构
公平调度器是一个多线程异步协作的架构,而为了保证调度过程中数据的一致性,在主要的流程中加入了FairScheduler对象锁。其中核心调度流程是单线程执行的。这意味着Container分配是串行的,这是调度器存在性能瓶颈的核心原因。
Scheduler Lock:FairScheduler对象锁。
AllocationFileLoaderService:负责公平策略配置文件的热加载,更新队列数据结构。
Continuous Scheduling Thread:核心调度线程,不停地执行上节的核心调度流程。
Update Thread:更新队列资源需求,执行Container抢占流程等。
Scheduler Event Dispatcher Thread: 调度器事件的处理器,处理App新增,App结束,Node新增,Node移除等事件。
性能评估
上文介绍了公平调度器的架构,在大规模的业务压力下,这个系统存在性能问题。从应用层的表现看,作业资源需求得不到满足。从系统模块看,多个模块协同工作,每个模块多多少少都存在性能问题。如何评估系统性能已经可以满足线上业务的需求?如何评估系统的业务承载能力?我们需要找到一个系统的性能目标。因此在谈性能优化方案之前,需要先说一说调度系统性能评估方法。
一般来说,在线业务系统的性能是用该系统能够承载的QPS和响应的TP99延迟时间来评估,而调度系统与在线业务系统不同的是:调度系统的性能不能用RPC(ResourceManager接收NodeManager和AppMaster的RPC请求)的响应延迟来评估。原因是:这些RPC调用过程跟调度系统的调度过程是异步的,因此不论调度性能多么差,RPC响应几乎不受影响。同理,不论RPC响应多么差,调度性能也几乎不受影响。
业务指标:有效调度
首先从满足业务需求角度分析调度系统的业务指标。调度系统的业务目标是满足业务资源需求。指标是:有效调度(validSchedule)。在生产环境,只要validSchedule达标,我们就认为目前调度器是满足线上业务需求的。
定义validSchedulePerMin表示某一分钟的调度性能达标的情况。达标值为1,不达标值为0。
validPending = min(queuePending, QueueMaxQuota)
if (usage / total > 90% || validPending == 0): validSchedulePerMin = 1 //集群资源使用率高于90%,或者集群有效资源需求为0,这时调度器性能达标。
if (validPending > 0 && usage / total < 90%) : validSchedulePerMin = 0;//集群资源使用率低于90%,并且集群存在有效资源需求,这时调度器性能不达标。
validPending表示集群中作业有效的资源需求量。
queuePending表示队列中所有作业的资源需求量。
QueueMaxQuota表示该队列资源最大限额。
Usage表示集群已经使用的资源量。
Total表示集群总体资源。
设置90%的原因是:资源池中的每个节点可能都有一小部分资源因为无法满足任何的资源需求,出现的资源碎片问题。这个问题类似Linux内存的碎片问题。由于离线作业的任务执行时间非常短,资源很快可以得到回收。在离线计算场景,调度效率的重要性远远大于更精确地管理集群资源碎片,因此离线调度策略暂时没有考虑资源碎片的问题。
validSchedulePerDay表示调度性能每天的达标率。validSchedulePerDay = ΣvalidSchedulePerMin /1440
目前线上业务规模下,业务指标如下:validSchedulePerMin > 0.9; validSchedulePerDay > 0.99
系统性能指标:每秒调度Container数
调度系统的本质是为作业分配Container,因此提出调度系统性能指标CPS--每秒调度Container数。
在生产环境,只要validSchedule达标,表明目前调度器是满足线上业务需求的。而在测试环境,需要关注不同压力条件下的CPS,找到当前系统承载能力的上限,并进一步指导性能优化工作。
CPS与测试压力相关,测试压力越大,CPS可能越低。从上文公平调度器的架构可以看到,CPS跟如下信息相关:
集群总体资源数:集群资源越多,集群可以并发运行的的Container越多,对调度系统产生越大的调度压力。目前每台物理机的CPU、Memory资源量差距不大,因此集群总体资源数主要看集群的物理机节点个数。
集群中正在运行的App数:作业数越多,需要调度的信息越多,调度压力越大。
集群中的队列个数:队列数越多,需要调度的信息越多,调度压力越大。
集群中每个任务的执行时间:任务执行时间越短会导致资源释放越快,那么动态产生的空闲资源越多,对调度系统产生的压力越大。
例如,集群1000个节点,同时运行1000个App,这些App分布在500个Queue上,每个App的每个Container执行时间是1分钟。在这样的压力条件下,调度系统在有大量资源需求的情况下,每秒可以调度1000个Container。那么在这个条件下,调度系统的CPS是1000/s。
调度压力模拟器
在线上环境中,我们可以通过观察上文提到的调度系统的指标来看当前调度性能是否满足业务需求。但我们做了一个性能优化策略,不能直接到在线上环境去试验,因此我们必须有能力在线下环境验证调度器的性能是满足业务需求的,之后才能把试验有效的优化策略推广到线上环境。
那我们在线下也搭建一套跟线上规模一样的集群,是否就可以进行调度器性能优化的分析和研究呢?理论上是可以的,但这需要大量的物理机资源,对公司来说是个巨大的成本。因此我们需要一个调度器的压力模拟器,在不需要大量物理机资源的条件下,能够模拟YARN的调度过程。
社区提供了开源调度器的压力模拟工具——Scheduler Load Simulater(SLS)。
如上图,左侧是开源SLS的架构图,整体都在一个进程中,ResourceManager模块里面有一个用线程模拟的Scheduler。App和NM(NodeManager)都是由线程模拟。作业资源申请和NM节点心跳采用方法调用。
开源架构存在的问题有:
模拟大规模APP和NM需要开启大量的线程,导致调度器线程和NM/App的模拟线程争抢cpu资源,影响调度器的评估。
SLS的Scheduler Wapper中加入了不合理的逻辑,严重影响调度器的性能。
SLS为了通用性考虑,没有侵入FairScheduler的调度过程获取性能指标,仅仅从外围获取了Queue资源需求,Queue资源使用量,App资源需求,App资源使用量等指标。这些指标都不是性能指标,无法利用这些指标分析系统性能瓶颈。
针对存在的问题,我们进行了架构改造。右侧是改造后的架构图,从SLS中剥离Scheduler Wapper的模拟逻辑,用真实的ResourceManager代替。SLS仅仅负责模拟作业的资源申请和节点的心跳汇报。ResourceManager是真实的,线上生产环境和线下压测环境暴露的指标是完全一样的,因此线上线下可以很直观地进行指标对比。详细代码参考:YARN-7672
细粒度监控指标
利用调度压力模拟器进行压测,观察到validSchedule不达标,但依然不清楚性能瓶颈到底在哪里。因此需要细粒度指标来确定性能的瓶颈点。由于调度过程是单线程的,因此细粒度指标获取的手段是侵入FairScheduler,在调度流程中采集关键函数每分钟的时间消耗。目标是找到花费时间占比最多的函数,从而定位系统瓶颈。例如:在preCheck函数的前后加入时间统计,就可以收集到调度过程中preCheck消耗的时间。
基于以上的思路,我们定义了10多个细粒度指标,比较关键的指标有:
每分钟父队列preCheck时间。
每分钟父队列排序时间。
每分钟子队列preCheck时间。
每分钟子队列排序时间。
每分钟为作业分配资源的时间。
每分钟因为作业无资源需求而花费的时间。
关键优化点
第一次做压测,给定的压力就是当时线上生产环境峰值的压力情况(1000节点、1000作业并发、500队列、单Container执行时间40秒)。经过优化后,调度器性能提升,满足业务需求,之后通过预估业务规模增长来调整测试压力,反复迭代地进行优化工作。
下图是性能优化时间线,纵轴为调度性能CPS。
优化排序比较函数
在核心调度流程中,第2步是排序子队列。观察细粒度指标,可以很清楚地看到每分钟调度流程总共用时50秒,其中排序时间占用了30秒,占了最大比例,因此首先考虑优化排序时间。
排序本身用的快速排序算法,已经没有优化空间。进一步分析排序比较函数,发现排序比较函数的时间复杂度非常高。
计算复杂度最高的部分是:需要获取队列/作业的资源使用情况(resourceUsage)。原算法中,每2个队列进行比较,需要获取resourceUsage的时候,程序都是现场计算。计算方式是递归累加该队列下所有作业的resourceUsage。这造成了巨大的重复计算量。
优化策略:将现场计算优化为提前计算。
提前计算算法:当为某个App分配了一个Container(资源量定义为containerResource),那么递归调整父队列的resourceUsage,让父队列的resourceUsage += containerResource。当释放某个App的一个Container,同样的道理,让父队列resourceUsage -= containerResource。利用提前计算算法,队列resourceUsage的统计时间复杂度降低到O(1)。
优化效果:排序相关的细粒度指标耗时明显下降。
红框中的指标表示每分钟调度器用来做队列/作业排序的时间。从图中可以看出,经过优化,排序时间从每分钟30G(30秒)下降到5G(5秒)以内。详细代码参考:YARN-5969
优化作业跳过时间
从上图看,优化排序比较函数后,蓝色的线有明显的增加,从2秒增加到了20秒。这条蓝线指标含义是每分钟调度器跳过没有资源需求的作业花费的时间。从时间占比角度来看,目前优化目标是减少这条蓝线的时间。
分析代码发现,所有队列/作业都会参与调度。但其实很多队列/作业根本没有资源需求,并不需要参与调度。因此优化策略是:在排序之前,从队列的Children中剔除掉没有资源需求的队列/作业。
优化效果:这个指标从20秒下降到几乎可以忽略不计。详细代码参考:YARN-3547
这时,从上图中可以明显看到有一条线呈现上升趋势,并且这个指标占了整个调度时间的最大比例。这条线对应的指标含义是确定要调度的作业后,调度器为这个作业分配出一个Container花费的时间。这部分逻辑平均执行一次的时间在0.02ms以内,并且不会随着集群规模、作业规模的增加而增加,因此暂时不做进一步优化。
队列并行排序优化
从核心调度流程可以看出,分配每一个Container,都需要进行队列的排序。排序的时间会随着业务规模增加(作业数、队列数的增加)而线性增加。
架构思考:对于公平调度器来说,排序是为了实现公平的调度策略,但资源需求是时时刻刻变化的,每次变化,都会引起作业资源使用的不公平。即使分配每一个Container时都进行排序,也无法在整个时间轴上达成公平策略。
例如,集群有10个CPU,T1时刻,集群只有一个作业App1在运行,申请了10个CPU,那么集群会把这10个CPU都分配给App1。T2时刻(T2 > T1),集群中新来一个作业App2,这时集群已经没有资源了,因此无法为App2分配资源。这时集群中App1和App2对资源的使用是不公平的。从这个例子看,仅仅通过调度的分配算法是无法在时间轴上实现公平调度。
目前公平调度器的公平策略是保证集群在某一时刻资源调度的公平。在整个时间轴上是需要抢占策略来补充达到公平的目标。因此从时间轴的角度考虑,没有必要在分配每一个Container时都进行排序。
综上分析,优化策略是排序过程与调度过程并行化。要点如下:
调度过程不再进行排序的步骤。
独立的线程池处理所有队列的排序,其中每个线程处理一个队列的排序。
排序之前,通过深度克隆队列/作业中用于排序部分的信息,保证排序过程中队列/作业的数据结构不变。
优化效果如下:
队列排序效率:利用线程池对2000个队列进行一次排序只需要5毫秒以内(2ms-5ms),在一秒内至少可以完成200次排序,对业务完全没有影响。
在并行运行1万作业,集群1.2万的节点,队列个数2000,单Container执行时间40秒的压力下,调度CPS达到5万,在一分钟内可以将整个集群资源打满,并持续打满。
上图中,15:26分,Pending值是0,表示这时集群目前所有的资源需求已经被调度完成。15:27分,resourceUsage达到1.0,表示集群资源使用率为100%,集群没有空闲资源。Pending值达到4M(400万 mb的内存需求)是因为没有空闲资源导致的资源等待。
稳定上线的策略
线下压测的结果非常好,最终要上到线上才能达成业务目标。然而稳定上线是有难度的,原因:
线上环境和线下压测环境中的业务差别非常大。线下没问题,上线不一定没问题。
当时YARN集群只有一个,那么调度器也只有一个,如果调度器出现异常,是整个集群的灾难,导致整个集群不可用。
除了常规的单元测试、功能测试、压力测试、设置报警指标之外,我们根据业务场景提出了针对集群调度系统的上线策略。
在线回滚策略
这里的关键问题是:系统通过配置加载线程更新了调度器某个参数的值,而调度线程也同时在按照这个参数值进行工作。在一次调度过程中可能多次查看这个参数的值,并且根据参数值来执行相应的逻辑。调度线程在一次调度过程中观察到的参数值发生变化,就会导致系统异常。
处理办法是通过复制资源的方式,避免多线程共享资源引起数据不一致的问题。调度线程在每次调度开始阶段,先将当前所有性能优化参数进行复制,确保在本次调度过程中观察到的参数不会变更。
数据自动校验策略
优化算法是为了提升性能,但要注意不能影响算法的输出结果,确保算法正确性。对于复杂的算法优化,确保算法正确性是一个很有难度的工作。
在“优化排序比较时间”的研发中,变更了队列resourceUsage的计算方法,从现场计算变更为提前计算。那么如何保证优化后算法计算出来的resourceUsage是正确的呢?
即使做了单元策略,功能测试,压力测试,但面对一个复杂系统,依然不能有100%的把握。另外,未来系统升级也可能引起这部分功能的Bug。
算法变更后,如果新的resourceUsage计算错误,那么就会导致调度策略一直错误执行下去。从而影响队列的资源分配。会对业务产生巨大的影响。例如,业务拿不到原本的资源量,导致业务延迟。
通过原先现场计算的方式得到的所有队列的resourceUsage一定是正确的,定义为oldResourceUsage。算法优化后,通过提前计算的方式得到所有队列的resourceUsage,定义为newResourceUsage。
在系统中,定期对oldResourceUsage和newResourceUsage进行比较,如果发现数据不一致,说明优化的算法有Bug,newResourceUsage计算错误。这时系统会向RD发送报警通知,同时自动地将所有计算错误的数据用正确的数据替换,使得错误得到及时自动修正。
总结与未来展望
本文主要介绍了美团点评Hadoop YARN集群公平调度器的性能优化实践。
做性能优化,首先要定义宏观的性能指标,从而能够评估系统的性能。
定义压测需要观察的细粒度指标,才能清晰看到系统的瓶颈。
工欲善其事,必先利其器。高效的压力测试工具是性能优化必备的利器。
优化算法的思路主要有:降低算法时间复杂度;减少重复计算和不必要的计算;并行化。
性能优化是永无止境的,要根据真实业务来合理预估业务压力,逐步开展性能优化的工作。
代码上线需谨慎,做好防御方案。
单个YARN集群调度器的性能优化总是有限的,目前我们可以支持1万节点的集群规模,那么未来10万,100万的节点我们如何应对?
我们的解决思路是:基于社区的思路,设计适合美团点评的业务场景的技术方案。社区Hadoop 3.0研发了Global Scheduling,完全颠覆了目前YARN调度器的架构,可以极大提高单集群调度性能。我们正在跟进这个Feature。社区的YARN Federation已经逐步完善。该架构可以支撑多个YARN集群对外提供统一的集群计算服务,由于每个YARN集群都有自己的调度器,这相当于横向扩展了调度器的个数,从而提高集群整体的调度能力。我们基于社区的架构,结合美团点评的业务场景,正在不断地完善美团点评的YARN Federation。
作者简介
世龙、廷稳,美团用户平台大数据与算法部研发工程师。
About团队
数据平台资源调度团队隶属美团用户平台大数据与算法部,目标是建设超大规模、高性能、支持异构计算资源和多场景的资源调度系统。目前管理的计算节点接近 3 万台,在单集群节点过万的规模下实现了单日数十万离线计算作业的高效调度,资源利用率超过 90%。资源调度系统同时实现了对实时计算作业、机器学习模型 Serving 服务等高可用场景的支持,可用性超过 99.9%。系统也提供了对 CPU/GPU 等异构资源的调度支持,实现了数千张 GPU卡的高效调度,以及 CPU 资源的离线与训练混合调度,目前正在引入 NPU/FPGA 等更多异构资源,针对机器学习场景的特点实现更高效合理的调度策略。
---------- END ----------
以上是关于Flink 大规模作业调度性能优化的主要内容,如果未能解决你的问题,请参考以下文章
Flink 1.13,面向流批一体的运行时与 DataStream API 优化