Spark源码分析之六:Task调度

Posted 吉日木图

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码分析之六:Task调度相关的知识,希望对你有一定的参考价值。

话说在《Spark源码分析之五:Task调度(一)》一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法。这个方法针对接收到的ReviveOffers事件进行处理。代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. // Make fake resource offers on all executors  
  2.     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)  
  3.     private def makeOffers() {  
  4.       // Filter out executors under killing  
  5.       // 过滤掉under killing的executors  
  6.       val activeExecutors = executorDataMap.filterKeys(executorIsAlive)  
  7.         
  8.       // 利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源  
  9.       val workOffers = activeExecutors.map { case (id, executorData) =>  
  10.         // 创建WorkerOffer对象  
  11.         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
  12.       }.toSeq  
  13.         
  14.       // 调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks  
  15.       // 这个scheduler就是TaskSchedulerImpl  
  16.       launchTasks(scheduler.resourceOffers(workOffers))  
  17.     }  

        代码逻辑很简单,一共分为三步:

 

        第一,从executorDataMap中过滤掉under killing的executors,得到activeExecutors;

        第二,利用activeExecutors中executorData的executorHost、freeCores,获取workOffers,即资源;

        第三,调用scheduler的resourceOffers()方法,分配资源,并调用launchTasks()方法,启动tasks:这个scheduler就是TaskSchedulerImpl。

        我们逐个进行分析,首先看看这个executorDataMap,其定义如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. private val executorDataMap = new HashMap[String, ExecutorData]  

        它是CoarseGrainedSchedulerBackend掌握的集群中executor的数据集合,key为String类型的executorId,value为ExecutorData类型的executor详细信息。ExecutorData包含的主要内容如下:

 

        1、executorEndpoint:RpcEndpointRef类型,RPC终端的引用,用于数据通信;

        2、executorAddress:RpcAddress类型,RPC地址,用于数据通信;

        3、executorHost:String类型,executor的主机;

        4、freeCores:Int类型,可用处理器cores;

        5、totalCores:Int类型,处理器cores总数;

        6、logUrlMap:Map[String, String]类型,日志url映射集合。

        这样,通过executorDataMap这个集合我们就能知道集群当前executor的负载情况,方便资源分析并调度任务。那么executorDataMap内的数据是何时及如何更新的呢?go on,继续分析。
        对于第一步中,过滤掉under killing的executors,其实现是对executorDataMap中的所有executor调用executorIsAlive()方法中,判断是否在executorsPendingToRemove和executorsPendingLossReason两个数据结构中,这两个数据结构中的executors,都是即将移除或者已丢失的executor。

        第二步,在过滤掉已失效或者马上要失效的executor后,利用activeExecutors中executorData的executorHost、freeCores,构造workOffers,即资源,这个workOffers更简单,是一个WorkerOffer对象,它代表了系统的可利用资源。WorkerOffer代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.  * Represents free resources available on an executor. 
  3.  */  
  4. private[spark]  
  5. case class WorkerOffer(executorId: String, host: String, cores: Int)  

        而最重要的第三步,先是调用scheduler.resourceOffers(workOffers),即TaskSchedulerImpl的resourceOffers()方法,然后再调用launchTasks()方法将tasks加载到executor上去执行。

 

        我们先看下TaskSchedulerImpl的resourceOffers()方法。代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.    * Called by cluster manager to offer resources on slaves. We respond by asking our active task 
  3.    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 
  4.    * that tasks are balanced across the cluster. 
  5.    * 
  6.    * 被集群manager调用以提供slaves上的资源。我们通过按照优先顺序询问活动task集中的task来回应。 
  7.    * 我们通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。 
  8.    */  
  9.   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {  
  10.       
  11.     // Mark each slave as alive and remember its hostname  
  12.     // Also track if new executor is added  
  13.     // 标记每个slave节点为alive活跃的,并且记住它的主机名  
  14.     // 同时也追踪是否有executor被加入  
  15.     var newExecAvail = false  
  16.       
  17.     // 循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源  
  18.     for (o <- offers) {  
  19.         
  20.       // 利用HashMap存储executorId->host映射的集合  
  21.       executorIdToHost(o.executorId) = o.host  
  22.         
  23.       // Number of tasks running on each executor  
  24.       // 每个executor上运行的task的数目,这里如果之前没有的话,初始化为0  
  25.       executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)  
  26.         
  27.       // 每个host上executors的集合  
  28.       // 这个executorsByHost被用来计算host活跃性,反过来我们用它来决定在给定的主机上何时实现数据本地性  
  29.       if (!executorsByHost.contains(o.host)) {// 如果executorsByHost中不存在对应的host  
  30.           
  31.         // executorsByHost中添加一条记录,key为host,value为new HashSet[String]()  
  32.         executorsByHost(o.host) = new HashSet[String]()  
  33.           
  34.         // 发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理  
  35.         // eventProcessLoop.post(ExecutorAdded(execId, host))  
  36.         // 调用DAGScheduler的executorAdded()方法处理  
  37.         executorAdded(o.executorId, o.host)  
  38.           
  39.         // 新的slave加入时,标志位newExecAvail设置为true  
  40.         newExecAvail = true  
  41.       }  
  42.         
  43.       // 更新hostsByRack  
  44.       for (rack <- getRackForHost(o.host)) {  
  45.         hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host  
  46.       }  
  47.     }  
  48.   
  49.     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.  
  50.     // 随机shuffle offers以避免总是把任务放在同一组workers上执行  
  51.     val shuffledOffers = Random.shuffle(offers)  
  52.       
  53.     // Build a list of tasks to assign to each worker.  
  54.     // 构造一个task列表,以分配到每个worker  
  55.     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))  
  56.       
  57.     // 可以使用的cpu资源  
  58.     val availableCpus = shuffledOffers.map(o => o.cores).toArray  
  59.       
  60.     // 获得排序好的task集合  
  61.     // 先调用Pool.getSortedTaskSetQueue()方法  
  62.     // 还记得这个Pool吗,就是调度器中的调度池啊  
  63.     val sortedTaskSets = rootPool.getSortedTaskSetQueue  
  64.       
  65.     // 循环每个taskSet  
  66.     for (taskSet <- sortedTaskSets) {  
  67.       // 记录日志  
  68.       logDebug("parentName: %s, name: %s, runningTasks: %s".format(  
  69.         taskSet.parent.name, taskSet.name, taskSet.runningTasks))  
  70.         
  71.       // 如果存在新的活跃的executor(新的slave节点被添加时)  
  72.       if (newExecAvail) {  
  73.         // 调用executorAdded()方法  
  74.         taskSet.executorAdded()  
  75.       }  
  76.     }  
  77.   
  78.     // Take each TaskSet in our scheduling order, and then offer it each node in increasing order  
  79.     // of locality levels so that it gets a chance to launch local tasks on all of them.  
  80.     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY  
  81.     var launchedTask = false  
  82.       
  83.     // 按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性  
  84.     // 位置本地性规则的顺序是:PROCESS_LOCAL(同进程)、NODE_LOCAL(同节点)、NO_PREF、RACK_LOCAL(同机架)、ANY(任何)  
  85.     for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {  
  86.       do {  
  87.         // 调用resourceOfferSingleTaskSet()方法进行任务集调度  
  88.         launchedTask = resourceOfferSingleTaskSet(  
  89.             taskSet, maxLocality, shuffledOffers, availableCpus, tasks)  
  90.       } while (launchedTask)  
  91.     }  
  92.   
  93.     // 设置标志位hasLaunchedTask  
  94.     if (tasks.size > 0) {  
  95.       hasLaunchedTask = true  
  96.     }  
  97.       
  98.     return tasks  
  99.   }  

         首先来看下它的主体流程。如下:

 

        1、设置标志位newExecAvail为false,这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

        2、循环offers,WorkerOffer为包含executorId、host、cores的结构体,代表集群中的可用executor资源:

            2.1、更新executorIdToHost,executorIdToHost为利用HashMap存储executorId->host映射的集合;

            2.2、更新executorIdToTaskCount,executorIdToTaskCount为每个executor上运行的task的数目集合,这里如果之前没有的话,初始化为0;

            2.3、如果新的slave加入:

                2.3.1、executorsByHost中添加一条记录,key为host,value为new HashSet[String]();

                2.3.2、发送一个ExecutorAdded事件,并由DAGScheduler的handleExecutorAdded()方法处理;

                2.3.3、新的slave加入时,标志位newExecAvail设置为true;

            2.4、更新hostsByRack;

        3、随机shuffle offers(集群中可用executor资源)以避免总是把任务放在同一组workers上执行;

        4、构造一个task列表,以分配到每个worker,针对每个executor按照其上的cores数目构造一个cores数目大小的ArrayBuffer,实现最大程度并行化;

        5、获取可以使用的cpu资源availableCpus;

        6、调用Pool.getSortedTaskSetQueue()方法获得排序好的task集合,即sortedTaskSets;

        7、循环sortedTaskSets中每个taskSet:

               7.1、如果存在新加入的slave,则调用taskSet的executorAdded()方法,动态调整位置策略级别,这么做很容易理解,新的slave节点加入了,那么随之而来的是数据有可能存在于它上面,那么这时我们就需要重新调整任务本地性规则;

        8、循环sortedTaskSets,按照位置本地性规则调度每个TaskSet,最大化实现任务的本地性:

              8.1、对每个taskSet,调用resourceOfferSingleTaskSet()方法进行任务集调度;

        9、设置标志位hasLaunchedTask,并返回tasks。

        接下来,我们详细解释下其中的每个步骤。

        第1步不用讲,只是设置标志位newExecAvail为false,并且记住这个标志位是在新的slave被添加时被设置的一个标志,下面在计算任务的本地性规则时会用到;

        第2步是集群中的可用executor资源offers的循环处理,更新一些数据结构,并且,在新的slave加入时,标志位newExecAvail设置为true,并且发送一个ExecutorAdded事件,交由DAGScheduler的handleExecutorAdded()方法处理。我们来看下DAGScheduler的这个方法:

 

[java] view plain copy
 
 技术分享技术分享
  1. private[scheduler] def handleExecutorAdded(execId: String, host: String) {  
  2.     // remove from failedEpoch(execId) ?  
  3.     if (failedEpoch.contains(execId)) {  
  4.       logInfo("Host added was in lost list earlier: " + host)  
  5.       failedEpoch -= execId  
  6.     }  
  7.     submitWaitingStages()  
  8.   }  

        很简单,先将对应host从failedEpoch中移除,failedEpoch存储的是系统探测到的失效节点的集合,存储的是execId->host的对应关系。接下来便是调用submitWaitingStages()方法提交等待的stages。这个方法我们之前分析过,这里不再赘述。但是存在一个疑点,之前stage都已提交了,这里为什么还要提交一遍呢?留待以后再寻找答案吧。

 

        第3步随机shuffle offers以避免总是把任务放在同一组workers上执行,这也没什么特别好讲的,为了避免所谓的热点问题而采取的一种随机策略而已。

        第4步也是,构造一个task列表,以分配到每个worker,针对每个executor,创建一个ArrayBuffer,存储的类型为TaskDescription,大小为executor的cores,即最大程度并行化,充分利用executor的cores。

        第5步就是获取到上述executor集合中cores集合availableCpus,即可以使用的cpu资源;

        下面我们重点分析下第6步,它是调用Pool.getSortedTaskSetQueue()方法,获得排序好的task集合。还记得这个Pool吗?它就是上篇文章《Spark源码分析之五:Task调度(一)》里讲到的调度器的中的调度池啊,我们看下它的getSortedTaskSetQueue()方法。代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {  
  2.       
  3.     // 创建一个ArrayBuffer,存储TaskSetManager  
  4.     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]  
  5.       
  6.     // schedulableQueue为Pool中的一个调度队列,里面存储的是TaskSetManager  
  7.     // 在TaskScheduler的submitTasks()方法中,通过层层调用,最终通过Pool的addSchedulable()方法将之前生成的TaskSetManager加入到schedulableQueue中  
  8.     // 而TaskSetManager包含具体的tasks  
  9.     // taskSetSchedulingAlgorithm为调度算法,包括FIFO和FAIR两种  
  10.     // 这里针对调度队列,<span style="font-family: Arial, Helvetica, sans-serif;">按照调度算法对其排序,</span>生成一个序列sortedSchedulableQueue,  
  11.     val sortedSchedulableQueue =  
  12.       schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)  
  13.       
  14.     // 循环sortedSchedulableQueue中所有的TaskSetManager,通过其getSortedTaskSetQueue来填充sortedTaskSetQueue  
  15.     for (schedulable <- sortedSchedulableQueue) {  
  16.       sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue  
  17.     }  
  18.       
  19.     // 返回sortedTaskSetQueue  
  20.     sortedTaskSetQueue  
  21.   }  

        首先,创建一个ArrayBuffer,用来存储TaskSetManager,然后,对Pool中已经存储好的TaskSetManager,即schedulableQueue队列,按照taskSetSchedulingAlgorithm调度规则或算法来排序,得到sortedSchedulableQueue,并循环其内的TaskSetManager,通过其getSortedTaskSetQueue()方法来填充sortedTaskSetQueue,最后返回。TaskSetManager的getSortedTaskSetQueue()方法也很简单,追加ArrayBuffer[TaskSetManager]即可,如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {  
  2.     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()  
  3.     sortedTaskSetQueue += this  
  4.     sortedTaskSetQueue  
  5.   }  

        我们着重来讲解下这个调度准则或算法taskSetSchedulingAlgorithm,其定义如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. // 调度准则,包括FAIR和FIFO两种  
  2.   var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {  
  3.     schedulingMode match {  
  4.       case SchedulingMode.FAIR =>  
  5.         new FairSchedulingAlgorithm()  
  6.       case SchedulingMode.FIFO =>  
  7.         new FIFOSchedulingAlgorithm()  
  8.     }  
  9.   }  

        它包括两种,FAIR和FIFO,下面我们以FIFO为例来讲解。代码在SchedulingAlgorithm.scala中,如下:

[java] view plain copy
 
 技术分享技术分享
  1. private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {  
  2.   // 比较函数  
  3.   override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
  4.     val priority1 = s1.priority  
  5.     val priority2 = s2.priority  
  6.       
  7.     // 先比较priority,即优先级  
  8.     // priority相同的话,再比较stageId  
  9.     // 前者小于后者的话,返回true,否则为false  
  10.     var res = math.signum(priority1 - priority2)  
  11.     if (res == 0) {  
  12.       val stageId1 = s1.stageId  
  13.       val stageId2 = s2.stageId  
  14.       res = math.signum(stageId1 - stageId2)  
  15.     }  
  16.     if (res < 0) {  
  17.       true  
  18.     } else {  
  19.       false  
  20.     }  
  21.   }  
  22. }  

        很简单,就是先比较两个TaskSetManagerder的优先级priority,优先级相同再比较stageId。而这个priority在TaskSet生成时,就是jobId,也就是FIFO是先按照Job的顺序再按照Stage的顺序进行顺序调度,一个Job完了再调度另一个Job,Job内是按照Stage的顺序进行调度。关于priority生成的代码如下所示:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. // 利用taskScheduler.submitTasks()提交task  
  2.       // jobId即为TaskSet的priority  
  3.       taskScheduler.submitTasks(new TaskSet(  
  4.         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))  

 

        比较复杂的是FairSchedulingAlgorithm,代码如下:

[java] view plain copy
 
 技术分享技术分享
  1. private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {  
  2.   override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
  3.       
  4.     val minShare1 = s1.minShare  
  5.     val minShare2 = s2.minShare  
  6.     val runningTasks1 = s1.runningTasks  
  7.     val runningTasks2 = s2.runningTasks  
  8.     val s1Needy = runningTasks1 < minShare1  
  9.     val s2Needy = runningTasks2 < minShare2  
  10.     val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble  
  11.     val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble  
  12.     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble  
  13.     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble  
  14.     var compare: Int = 0  
  15.   
  16.     // 前者的runningTasks<minShare而后者相反的的话,返回true;  
  17.     // runningTasks为正在运行的tasks数目,minShare为最小共享cores数;  
  18.     // 前面两个if判断的意思是两个TaskSetManager中,如果其中一个正在运行的tasks数目小于最小共享cores数,则优先调度该TaskSetManager  
  19.     if (s1Needy && !s2Needy) {  
  20.       return true  
  21.     } else if (!s1Needy && s2Needy) {// 前者的runningTasks>=minShare而后者相反的的话,返回true  
  22.       return false  
  23.     } else if (s1Needy && s2Needy) {  
  24.       // 如果两者的正在运行的tasks数目都比最小共享cores数小的话,再比较minShareRatio  
  25.       // minShareRatio为正在运行的tasks数目与最小共享cores数的比率  
  26.       compare = minShareRatio1.compareTo(minShareRatio2)  
  27.     } else {  
  28.       // 最后比较taskToWeightRatio,即权重使用率,weight代表调度池对资源获取的权重,越大需要越多的资源  
  29.       compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)  
  30.     }  
  31.   
  32.     if (compare < 0) {  
  33.       true  
  34.     } else if (compare > 0) {  
  35.       false  
  36.     } else {  
  37.       s1.name < s2.name  
  38.     }  
  39.   }  
  40. }  

        它的调度逻辑主要如下:

 

        1、优先看正在运行的tasks数目是否小于最小共享cores数,如果两者只有一个小于,则优先调度小于的那个,原因是既然正在运行的Tasks数目小于共享cores数,说明该节点资源比较充足,应该优先利用;

        2、如果不是只有一个的正在运行的tasks数目是否小于最小共享cores数的话,则再判断正在运行的tasks数目与最小共享cores数的比率;

        3、最后再比较权重使用率,即正在运行的tasks数目与该TaskSetManager的权重weight的比,weight代表调度池对资源获取的权重,越大需要越多的资源。

        到此为止,获得了排序好的task集合,我们来到了第7步:如果存在新加入的slave,则调用taskSet的executorAdded()方法,即TaskSetManager的executorAdded()方法,代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. def executorAdded() {  
  2.     recomputeLocality()  
  3.   }  

        没说的,继续追踪,看recomputeLocality()方法。代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. // 重新计算位置  
  2.   def recomputeLocality() {  
  3.     // 首先获取之前的位置Level  
  4.     // currentLocalityIndex为有效位置策略级别中的索引,默认为0  
  5.     val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)  
  6.       
  7.     // 计算有效的位置Level  
  8.     myLocalityLevels = computeValidLocalityLevels()  
  9.       
  10.     // 获得位置策略级别的等待时间  
  11.     localityWaits = myLocalityLevels.map(getLocalityWait)  
  12.       
  13.     // 设置当前使用的位置策略级别的索引  
  14.     currentLocalityIndex = getLocalityIndex(previousLocalityLevel)  
  15.   }  

        首先说下这个currentLocalityIndex,它的定义为:

[java] view plain copy
 
 技术分享技术分享
  1. var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels  

        它是有效位置策略级别中的索引,指示当前的位置信息。也就是我们上一个task被launched所使用的Locality Level。

 

        接下来看下myLocalityLevels,它是任务集TaskSet中应该使用哪种位置Level的数组,在TaskSetManager对象实例化时即被初始化,变量定义如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling  
  2.   // 确定在我们的任务集TaskSet中应该使用哪种位置Level,以便我们做延迟调度  
  3.   var myLocalityLevels = computeValidLocalityLevels()  

        computeValidLocalityLevels()方法为计算该TaskSet使用的位置策略的方法,代码如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.    * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been 
  3.    * added to queues using addPendingTask. 
  4.    * 计算该TaskSet使用的位置策略。假设所有的任务已经通过addPendingTask()被添加入队列 
  5.    */  
  6.   private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {  
  7.     // 引入任务位置策略  
  8.     import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}  
  9.       
  10.     // 创建ArrayBuffer类型的levels,存储TaskLocality  
  11.     val levels = new ArrayBuffer[TaskLocality.TaskLocality]  
  12.       
  13.     // 如果pendingTasksForExecutor不为空,且PROCESS_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且  
  14.     // 如果pendingTasksForExecutor中每个executorId在sched的executorIdToTaskCount中存在  
  15.     // executorIdToTaskCount为每个executor上运行的task的数目集合  
  16.     if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&  
  17.         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {  
  18.       levels += PROCESS_LOCAL  
  19.     }  
  20.       
  21.     // 如果pendingTasksForHost不为空,且NODE_LOCAL级别中TaskSetManager等待分配下一个任务的时间不为零,且  
  22.     // 如果pe

以上是关于Spark源码分析之六:Task调度的主要内容,如果未能解决你的问题,请参考以下文章

Spark源代码分析之六:Task调度

spark DAGSchedulerTaskScheduleExecutor执行task源码分析

Spark源码分析之八:Task运行

深入理解spark-taskScheduler,schedulerBackend源码分析

Spark 源码分析系列

7. spark源码分析(基于yarn cluster模式)- Task划分提交