Spark的Task调度原理

Posted 大数据架构师修行之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的Task调度原理相关的知识,希望对你有一定的参考价值。

在给定的Spark应用中,若多个并行Job是从独立的线程提交的,则他们可以并行运行。这里的Job是指,执行Spark的Action函数产生的一些列动作。

默认情况下,Spark的调度程序以FIFO方式运行Task。每个Job都被划分为“Stage”(例如,map和reduce阶段),当Stage有Task需要提交执行时,第一个Job在所有可用资源上都具有优先级;然后是第二个Job具有优先级,依此类推。

若调度队列头部的Job不需要使用整个集群资源,后面的Job可以立即执行。但若队列头部的Job很大,则后面的Job必须等待,就可能会被大大延迟。

最终Stage提交时,会以TaskSet的形式提交,这是通过任务调度器(实现TaskScheduler接口)来完成的。在调度TaskSet时,会借助调度对象(实现接口Schedulable)和调度算法(FIFO和FAIR)。

任务调度器的调度对象分为两类:Pool和TaskSetManager。而这两类调度对象都实现了接口Schedulable。并会根据调度算法对需要调度的对象进行调度。

Task总体调度流程

TaskSet的最终提交由调度后台(其实是CoarseGrainedSchedulerBackend类)的reviveOffers函数向DriverEndpoint RPC环境发送ReviveOffers命令,调度后台的DriverEndpoint调用makeOffers函数来完成TaskSet的提交。Task提交的总体流程如下:

1.在executorDataMap中,过滤掉已经死掉的Executor。executorDataMap是由Executor端发送的StatusUpdate消息来更新的。

2.遍历所有存活的Executor,并进行以下检查:

(1)判断目前的Executor是否在存活的Executor队列中,若不在,则需要把该Executor添加到对应主机的可用Executor集合中。

(2)过滤掉在黑名单(若启用的话)中的Executor,剩下的就是可以使用的Executor资源列表。

3.遍历可用的Executor列表,获取每个Executor的空闲CPU数量

4.根据空闲CPU数量和每个Task需要的CPU数量(参数spark.task.cpus的值,默认是1),来决定可以提交的Task的数量。计算公式很简单:每个Executor可运行的Task数=每个Executor的空闲CPU数量/每个Task需要的CPU数量。总的可运行Task数,只需要把每个Executor可运行的Task数量相加。

5.根据资源调度算法对TaskSet对应的TaskSetManager调度对象进行排序

6.若本次的TaskSet需要一次性提交,且可以运行Task的资源(CPU)数,小于本次需要提交的Task数,则跳过本次Task的提交。

调度算法

FIFO调度算法

该调度算法通过比较调度对象的优先级(priority)和stageId的值来决定先执行哪个调度对象。

FIFO调度算法流程如下:

  1. 比较调度对象s1和s2的优先级字段(priority)的值,若s1的值小于s2的值,则s1先执行;若两者相等,则继续进行第2步的判断。要注意的是:这里的priority其实是jobId的值,所以,这里比较的其实是jobId。其意义为:先提交的jobId会比后提交的jobId小,就先执行。

  2. 若调度对象的priority的值相等,则继续比较stageId的值,先执行较小stageId的stage。其意义为:先创建的stage,就先执行。注意stageId是唯一的,不可能相等。

从该算的流程可以看出,该算法基本遵循先创建先执行的原则,所以也被称为FIFO调度算法(First In First Out)。

FIFO调度算法的实现类是:FIFOSchedulingAlgorithm。而该类其实是实现了接口SchedulingAlgorithm中的comparator函数。通过该函数来比较优先级(priority)和stageId的值。

在默认调度器TaskSchedulerImp中FIFO调度队列,如图1所示:

如图1所示,若是有多个job同时提交,会把这些job创建的调度对象TaskSetManager都放到调度池rootPool的调度队列中。在提交TaskSet时,会按FIFO调度算法对这些调度对象(TaskSetManager)进行排序,然后按顺序进行TaskSet的提交。

FAIR调度算法

从Spark 0.8开始,还可以配置Job之间的公平共享。在公平共享下,Spark以“轮循”方式在作业之间分配任务,以便所有Job都获得大致相等的群集资源份额。这意味着在运行长Job时提交的短Job可以立即开始接收资源,并且仍然获得良好的响应时间,而无需等待长作业完成。此模式最适合多用户设置。

公平调度程序还支持将job分组到不同的调度池中,并为每个池设置不同的调度选项(例如:weight)。这样就可以为重要的job创建“高优先级”的调度池,或则将每个用户的作业分组在一起,并为用户提供相等的资源,而不管他们有多少并发任务,而不是以作业来分配资源。

没有任何干预,新提交的作业将进入默认池,但是可以通过在提交线程的SparkContext中添加spark.scheduler.pool“本地属性”来设置作业的池。这样做如下:

 // Assuming sc is your SparkContext variable
 sc.setLocalProperty("spark.scheduler.pool", "pool1")

FAIR调度算法的实现类是:FairSchedulingAlgorithm。它通过minShare,runningTasks和weight的变量值来比较调度的优先级。FAIR调度的参数说明如下:

配置参数 参数说你应
schedulingMode 调度模式,有两种:FIFO和FAIR。
weight 控制相对于其他池的池在集群中的份额。默认情况下,所有池的权重均为1。例如,如果给特定池赋予权重2,则它将获得比其他活动池多2倍的资源。设置较高的权重(例如1000)还可以在池之间实现优先级-本质上,权重1000的池总是在作业处于活动状态时首先启动任务。
minShare: 除了总的权重外,还可以为每个池分配管理员希望拥有的最小份额(作为CPU核心数)。公平的调度程序总是尝试满足所有活动池的最小份额,然后根据权重重新分配额外的资源。因此,minShare属性可以是确保池始终快速获取特定数量的资源(例如10个核心)的另一种方式,而不会为集群的其余部分赋予较高的优先级。默认情况下,每个池的minShare为0。

使用FAIR调度算法时,对于两个调度实体s1和s2,通过以下流程来决定s1和s2的调度顺序:

  1. 若s1的正在运行的任务数小于s1的minShare参数,而相反s2的却是大于,则s1优先被调度。反过来,若s2的任务数小于s2的minShare参数,而s1却大于,则s2优先被调度。若s1和s2的运行任务数,都小于各自的minShare参数,则跳到第2步;若s1和s2的运行任务数,都大于于各自的minShare参数,则跳到第3步。

  2. 计算每个调度对象的运行任务数和minShare的比值,比值小的优先级高。若比值相等,跳到

  3. 计算运行任务数和调度实体的权重(weigth)的比值,比值小的优先级高。若比值相等,跳到第4步。

  4. 若前面的比较都相等,则最后会直接比较两个调度实体的name的ascii码,较小的优先级高。

FAIR调度的示意图如下:

如图2所示,在通过FAIR调度算法进行调度时,先确定挂载到rootPool下的各个pool的优先级,然后再确定每个pool中各个调度对象的优先级,不同的Pool可以配置不同的调度算法,最后提交的顺序由调度算法进行排序后的调度队列的顺序确定。

调度对象创建合约

在任务调度器中有两类调度对象,这两类调度对象都必须实现接口:Schedulable。该接口的主要成员如下表:

从调度对象的接口定义中可以看出以下几点:

  1. 每个调度对象都属于某个stage

  2. 所有的调度对象都被保存到一个同步队列中

  3. 调度对象的调度顺序是根据调度算法来决定的,调度算法有两种:FIFO和FAIR。

小结

本文讲述了TaskSet提交的过程,并介绍了Spark提供的两种调度算法的实现。

以上是关于Spark的Task调度原理的主要内容,如果未能解决你的问题,请参考以下文章

Spark源码分析之六:Task调度

Spark源码分析之七:Task运行

spark的task调度器(FAIR公平调度算法)

大数据:Spark CoreDriver上的Task的生成分配调度

大数据:Spark CoreDriver上的Task的生成分配调度

spark DAGSchedulerTaskScheduleExecutor执行task源码分析