Spark源码分析之七:Task运行

Posted 吉日木图

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark源码分析之七:Task运行相关的知识,希望对你有一定的参考价值。

在Task调度相关的两篇文章《Spark源码分析之五:Task调度(一)》《Spark源码分析之六:Task调度(二)》中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[TaskDescription]],相关代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. // 调用scheduler的resourceOffers()方法,分配资源,并在得到资源后,调用launchTasks()方法,启动tasks  
  2.       // 这个scheduler就是TaskSchedulerImpl  
  3.       launchTasks(scheduler.resourceOffers(workOffers))  
[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.    * Called by cluster manager to offer resources on slaves. We respond by asking our active task 
  3.    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so 
  4.    * that tasks are balanced across the cluster. 
  5.    * 
  6.    * 被集群manager调用以提供slaves上的资源。我们通过按照优先顺序询问活动task集中的task来回应。 
  7.    * 我们通过循环的方式将task调度到每个节点上以便tasks在集群中可以保持大致的均衡。 
  8.    */  
  9.   def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {  

        这个TaskDescription很简单,是传递到executor上即将被执行的Task的描述,通常由TaskSetManager的resourceOffer()方法生成。代码如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.  * Description of a task that gets passed onto executors to be executed, usually created by 
  3.  * [[TaskSetManager.resourceOffer]]. 
  4.  */  
  5. private[spark] class TaskDescription(  
  6.     val taskId: Long,  
  7.     val attemptNumber: Int,  
  8.     val executorId: String,  
  9.     val name: String,  
  10.     val index: Int,    // Index within this task‘s TaskSet  
  11.     _serializedTask: ByteBuffer)  
  12.   extends Serializable {  
  13.   
  14.   // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer  
  15.   // 由于ByteBuffers不可以被序列化,所以将task包装在SerializableBuffer中,_serializedTask为ByteBuffer类型的Task  
  16.   private val buffer = new SerializableBuffer(_serializedTask)  
  17.     
  18.   // 序列化后的Task, 取buffer的value  
  19.   def serializedTask: ByteBuffer = buffer.value  
  20.   
  21.   
  22.   override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)  
  23. }  

        此时,得到Seq[Seq[TaskDescription]],即Task被调度到相应executor上后(仅是逻辑调度,实际上并未分配到executor上执行),接下来要做的,便是真正的将Task分配到指定的executor上去执行,也就是本篇我们将要讲的Task的运行。而这部分的开端,源于上述提到的CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的launchTasks()方法,代码如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. // Launch tasks returned by a set of resource offers  
  2.     private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {  
  3.       
  4.       // 循环每个task  
  5.       for (task <- tasks.flatten) {  
  6.           
  7.         // 序列化Task  
  8.         val serializedTask = ser.serialize(task)  
  9.           
  10.         // 序列化后的task的大小超出规定的上限  
  11.         // 即如果序列化后task的大小大于等于框架配置的Akka消息最大大小减去除序列化task或task结果外,一个Akka消息需要保留的额外大小的值  
  12.         if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {  
  13.             
  14.           // 根据task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中获取对应的TaskSetManager  
  15.           scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>  
  16.             try {  
  17.               var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +  
  18.                 "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +  
  19.                 "spark.akka.frameSize or using broadcast variables for large values."  
  20.               msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,  
  21.                 AkkaUtils.reservedSizeBytes)  
  22.                 
  23.               // 调用TaskSetManager的abort()方法,标记对应TaskSetManager为失败  
  24.               taskSetMgr.abort(msg)  
  25.             } catch {  
  26.               case e: Exception => logError("Exception in error callback", e)  
  27.             }  
  28.           }  
  29.         }  
  30.         else {// 序列化后task的大小在规定的大小内  
  31.             
  32.           // 从executorDataMap中,根据task.executorId获取executor描述信息executorData  
  33.           val executorData = executorDataMap(task.executorId)  
  34.             
  35.           // executorData中,freeCores做相应减少  
  36.           executorData.freeCores -= scheduler.CPUS_PER_TASK  
  37.             
  38.           // 利用executorData中的executorEndpoint,发送LaunchTask事件,LaunchTask事件中包含序列化后的task  
  39.           executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))  
  40.         }  
  41.       }  
  42.     }  

        launchTasks的执行逻辑很简单,针对传入的TaskDescription序列,循环每个Task,做以下处理:

 

        1、首先对Task进行序列化,得到serializedTask;

        2、针对序列化后的Task:serializedTask,判断其大小:

              2.1、序列化后的task的大小达到或超出规定的上限,即框架配置的Akka消息最大大小,减去除序列化task或task结果外,一个Akka消息需要保留的额外大小的值,则根据task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中获取对应的TaskSetManager,并调用其abort()方法,标记对应TaskSetManager为失败;

              2.2、序列化后的task的大小未达到上限,在规定的大小范围内,则:

                       2.2.1、从executorDataMap中,根据task.executorId获取executor描述信息executorData;

                       2.2.2、在executorData中,freeCores做相应减少;

                       2.2.3、利用executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件,LaunchTask事件中包含序列化后的task,将Task传递到executor中去执行。

        接下来,我们重点分析下上述流程。

        先说下异常流程,即序列化后Task的大小超过上限时,对TaskSet标记为失败的处理。入口方法为TaskSetManager的abort()方法,代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {  
  2.       
  3.     // TODO: Kill running tasks if we were not terminated due to a Mesos error  
  4.     // 调用DAGScheduler的taskSetFailed()方法,标记TaskSet运行失败  
  5.     sched.dagScheduler.taskSetFailed(taskSet, message, exception)  
  6.       
  7.     // 标志位isZombie设置为true  
  8.     isZombie = true  
  9.       
  10.     // 满足一定条件的情况下,将TaskSet标记为Finished  
  11.     maybeFinishTaskSet()  
  12.   }  

        abort()方法处理逻辑共分三步:

 

        第一,调用DAGScheduler的taskSetFailed()方法,标记TaskSet运行失败;

        第二,标志位isZombie设置为true;

        第三,满足一定条件的情况下,将TaskSet标记为Finished。

        首先看下DAGScheduler的taskSetFailed()方法,代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.    * Called by the TaskSetManager to cancel an entire TaskSet due to either repeated failures or 
  3.    * cancellation of the job itself. 
  4.    */  
  5.   def taskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]): Unit = {  
  6.     eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception))  
  7.   }  

        和第二篇文章《Spark源码分析之二:Job的调度模型与运行反馈》中Job的调度模型一致,都是依靠事件队列eventProcessLoop来完成事件的调度执行的,这里,我们在事件队列eventProcessLoop中放入了一个TaskSetFailed事件。在DAGScheduler的事件处理调度函数doOnReceive()方法中,明确规定了事件的处理方法,代码如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. // 如果是TaskSetFailed事件,调用dagScheduler.handleTaskSetFailed()方法处理  
  2.     case TaskSetFailed(taskSet, reason, exception) =>  
  3.       dagScheduler.handleTaskSetFailed(taskSet, reason, exception)  

        下面,我们看下handleTaskSetFailed()这个方法。

 

[java] view plain copy
 
 技术分享技术分享
  1. private[scheduler] def handleTaskSetFailed(  
  2.       taskSet: TaskSet,  
  3.       reason: String,  
  4.       exception: Option[Throwable]): Unit = {  
  5.       
  6.     // 根据taskSet的stageId获取到对应的Stage,循环调用abortStage,终止该Stage  
  7.     stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }  
  8.       
  9.     // 提交等待的Stages  
  10.     submitWaitingStages()  
  11.   }  

        很简单,首先通过taskSet的stageId获取到对应的Stage,针对Stage,循环调用abortStage()方法,终止该Stage,然后调用submitWaitingStages()方法提交等待的Stages。我们先看下abortStage()方法,代码如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.    * Aborts all jobs depending on a particular Stage. This is called in response to a task set 
  3.    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. 
  4.    * 终止给定Stage上的所有Job。 
  5.    */  
  6.   private[scheduler] def abortStage(  
  7.       failedStage: Stage,  
  8.       reason: String,  
  9.       exception: Option[Throwable]): Unit = {  
  10.       
  11.     // 如果stageIdToStage中不存在对应的stage,说明stage已经被移除,直接返回  
  12.     if (!stageIdToStage.contains(failedStage.id)) {  
  13.       // Skip all the actions if the stage has been removed.  
  14.       return  
  15.     }  
  16.       
  17.     // 遍历activeJobs中的ActiveJob,逐个调用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs  
  18.     val dependentJobs: Seq[ActiveJob] =  
  19.       activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq  
  20.       
  21.     // 标记failedStage的完成时间completionTime  
  22.     failedStage.latestInfo.completionTime = Some(clock.getTimeMillis())  
  23.       
  24.     // 遍历dependentJobs,调用failJobAndIndependentStages()  
  25.     for (job <- dependentJobs) {  
  26.       failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", exception)  
  27.     }  
  28.     if (dependentJobs.isEmpty) {  
  29.       logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")  
  30.     }  
  31.   }  

        这个方法的处理逻辑主要分为四步:

 

        1、如果stageIdToStage中不存在对应的stage,说明stage已经被移除,直接返回,这是对异常情况下的一种特殊处理;

        2、遍历activeJobs中的ActiveJob,逐个调用stageDependsOn()方法,找出存在failedStage的祖先stage的activeJob,即dependentJobs;

        3、标记failedStage的完成时间completionTime;

        4、遍历dependentJobs,调用failJobAndIndependentStages()。

        其它都好说,我们主要看下stageDependsOn()和failJobAndIndependentStages()这两个方法。首先看下stageDependsOn()方法,代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. /** Return true if one of stage‘s ancestors is target. */  
  2.   // 如果参数stage的祖先是target,返回true  
  3.   private def stageDependsOn(stage: Stage, target: Stage): Boolean = {  
  4.       
  5.     // 如果stage即为target,返回true  
  6.     if (stage == target) {  
  7.       return true  
  8.     }  
  9.       
  10.     // 存储处理过的RDD  
  11.     val visitedRdds = new HashSet[RDD[_]]  
  12.       
  13.     // We are manually maintaining a stack here to prevent StackOverflowError  
  14.     // caused by recursively visiting  
  15.     // 存储待处理的RDD  
  16.     val waitingForVisit = new Stack[RDD[_]]  
  17.       
  18.     // 定义一个visit()方法  
  19.     def visit(rdd: RDD[_]) {  
  20.       // 如果该RDD未被处理过的话,继续处理  
  21.       if (!visitedRdds(rdd)) {  
  22.         // 将RDD添加到visitedRdds中  
  23.         visitedRdds += rdd  
  24.           
  25.         // 遍历RDD的依赖  
  26.         for (dep <- rdd.dependencies) {  
  27.           dep match {  
  28.             // 如果是ShuffleDependency  
  29.             case shufDep: ShuffleDependency[_, _, _] =>  
  30.               
  31.               // 获得mapStage,并且如果stage的isAvailable为false的话,将其压入waitingForVisit  
  32.               val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)  
  33.               if (!mapStage.isAvailable) {  
  34.                 waitingForVisit.push(mapStage.rdd)  
  35.               }  // Otherwise there‘s no need to follow the dependency back  
  36.             // 如果是NarrowDependency,直接将其压入waitingForVisit  
  37.             case narrowDep: NarrowDependency[_] =>  
  38.               waitingForVisit.push(narrowDep.rdd)  
  39.           }  
  40.         }  
  41.       }  
  42.     }  
  43.       
  44.     // 从stage的rdd开始处理,将其入栈waitingForVisit  
  45.     waitingForVisit.push(stage.rdd)  
  46.       
  47.     // 当waitingForVisit中存在数据,就调用visit()方法进行处理  
  48.     while (waitingForVisit.nonEmpty) {  
  49.       visit(waitingForVisit.pop())  
  50.     }  
  51.       
  52.     // 根据visitedRdds中是否存在target的rdd判断参数stage的祖先是否为target  
  53.     visitedRdds.contains(target.rdd)  
  54.   }  

        这个方法主要是判断参数stage是否为参数target的祖先stage,其代码风格与stage划分和提交中的部分代码一样,这在前面的两篇文章中也提到过,在此不再赘述。而它主要是通过stage的rdd,并遍历其上层依赖的rdd链,将每个stage的rdd加入到visitedRdds中,最后根据visitedRdds中是否存在target的rdd判断参数stage的祖先是否为target。值得一提的是,如果RDD的依赖是NarrowDependency,直接将其压入waitingForVisit,如果为ShuffleDependency,则需要判断stage的isAvailable,如果为false,则将对应RDD压入waitingForVisit。关于isAvailable,我在《Spark源码分析之四:Stage提交》一文中具体阐述过,这里不再赘述。

        接下来,我们再看下failJobAndIndependentStages()方法,这个方法的主要作用就是使得一个Job和仅被该Job使用的所有stages失败,并清空有关状态。代码如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */  
  2.   // 使得一个Job和仅被该Job使用的所有stages失败,并清空有关状态  
  3.   private def failJobAndIndependentStages(  
  4.       job: ActiveJob,  
  5.       failureReason: String,  
  6.       exception: Option[Throwable] = None): Unit = {  
  7.       
  8.     // 构造一个异常,内容为failureReason  
  9.     val error = new SparkException(failureReason, exception.getOrElse(null))  
  10.       
  11.     // 标志位,是否能取消Stages  
  12.     var ableToCancelStages = true  
  13.   
  14.     // 标志位,是否应该中断线程  
  15.     val shouldInterruptThread =  
  16.       if (job.properties == null) false  
  17.       else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean  
  18.   
  19.     // Cancel all independent, running stages.  
  20.     // 取消所有独立的,正在运行的stages  
  21.       
  22.     // 根据Job的jobId,获取其stages  
  23.     val stages = jobIdToStageIds(job.jobId)  
  24.       
  25.     // 如果stages为空,记录错误日志  
  26.     if (stages.isEmpty) {  
  27.       logError("No stages registered for job " + job.jobId)  
  28.     }  
  29.       
  30.     // 遍历stages,循环处理  
  31.     stages.foreach { stageId =>  
  32.         
  33.       // 根据stageId,获取jobsForStage,即每个Job所包含的Stage信息  
  34.       val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)  
  35.         
  36.       // 首先处理异常情况,即jobsForStage为空,或者jobsForStage中不包含当前Job  
  37.       if (jobsForStage.isEmpty || !jobsForStage.get.contains(job.jobId)) {  
  38.         logError(  
  39.           "Job %d not registered for stage %d even though that stage was registered for the job"  
  40.             .format(job.jobId, stageId))  
  41.       } else if (jobsForStage.get.size == 1) {  
  42.         // 如果stageId对应的stage不存在  
  43.         if (!stageIdToStage.contains(stageId)) {  
  44.           logError(s"Missing Stage for stage with id $stageId")  
  45.         } else {  
  46.           // This is the only job that uses this stage, so fail the stage if it is running.  
  47.           //   
  48.           val stage = stageIdToStage(stageId)  
  49.           if (runningStages.contains(stage)) {  
  50.             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask  
  51.                 
  52.               // 调用taskScheduler的cancelTasks()方法,取消stage内的tasks  
  53.               taskScheduler.cancelTasks(stageId, shouldInterruptThread)  
  54.                 
  55.               // 标记Stage为完成  
  56.               markStageAsFinished(stage, Some(failureReason))  
  57.             } catch {  
  58.               case e: UnsupportedOperationException =>  
  59.                 logInfo(s"Could not cancel tasks for stage $stageId", e)  
  60.               ableToCancelStages = false  
  61.             }  
  62.           }  
  63.         }  
  64.       }  
  65.     }  
  66.   
  67.     if (ableToCancelStages) {// 如果能取消Stages  
  68.       
  69.       // 调用job监听器的jobFailed()方法  
  70.       job.listener.jobFailed(error)  
  71.         
  72.       // 为Job和独立Stages清空状态,独立Stages的意思为该stage仅为该Job使用  
  73.       cleanupStateForJobAndIndependentStages(job)  
  74.         
  75.       // 发送一个SparkListenerJobEnd事件  
  76.       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))  
  77.     }  
  78.   }  

        处理过程还是很简单的,读者可以通过上述源码和注释自行补脑,这里就先略过了。

 

        下面,再说下正常情况下,即序列化后Task大小未超过上限时,LaunchTask事件的发送及executor端的响应。代码再跳转到CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的launchTasks()方法。正常情况下处理流程主要分为三大部分:

        1、从executorDataMap中,根据task.executorId获取executor描述信息executorData;

        2、在executorData中,freeCores做相应减少;

        3、利用executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件,LaunchTask事件中包含序列化后的task,将Task传递到executor中去执行。

        我们重点看下第3步,利用Driver端持有的executor描述信息executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件给executor,将Task传递到executor中去执行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。

        我们先说下这个CoarseGrainedExecutorBackend,类的定义如下所示:

 

[java] view plain copy
 
 技术分享技术分享
  1. private[spark] class CoarseGrainedExecutorBackend(  
  2.     override val rpcEnv: RpcEnv,  
  3.     driverUrl: String,  
  4.     executorId: String,  
  5.     hostPort: String,  
  6.     cores: Int,  
  7.     userClassPath: Seq[URL],  
  8.     env: SparkEnv)  
  9.   extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {  

        由上面的代码我们可以知道,它实现了ThreadSafeRpcEndpoint和ExecutorBackend两个trait,而ExecutorBackend的定义如下:

 

 

[java] view plain copy
 
 技术分享技术分享
  1. /** 
  2.  * A pluggable interface used by the Executor to send updates to the cluster scheduler. 
  3.  * 一个被Executor用来发送更新到集群调度器的可插拔接口。 
  4.  */  
  5. private[spark] trait ExecutorBackend {  
  6.     
  7.   // 唯一的一个statusUpdate()方法  
  8.   // 需要Long类型的taskId、TaskState类型的state、ByteBuffer类型的data三个参数  
  9.   def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)  
  10. }  

 

        那么它自然就有两种主要的任务,第一,作为endpoint提供driver与executor间的通讯功能;第二,提供了executor任务执行时状态汇报的功能。

        CoarseGrainedExecutorBackend到底是什么呢?这里我们先不深究,留到以后分析,你只要知道它是Executor的一个后台辅助进程,和Executor是一对一的关系,向Executor提供了与Driver通讯、任务执行时状态汇报两个基本功能即可。

        接下来,我们看下CoarseGrainedExecutorBackend是如何处理LaunchTask事件的。做为RpcEndpoint,在其处理各类事件或消息的receive()方法中,定义如下:

 

[java] view plain copy
 
 技术分享技术分享
  1. case LaunchTask(data) =>  
  2.       if (executor == null) {  
  3.         logError("Received LaunchTask command but executor was null")  
  4.         System.exit(1)  
  5.       } else {  
  6.         
  7.         // 反序列话task,得到taskDesc  
  8.         val taskDesc = ser.deserialize[TaskDescription](data.value)  
  9.         logInfo("Got assigned task " + taskDesc.taskId)  
  10.           
  11.         // 调用executor的launchTask()方法加载task  
  12.         executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,  
  13.           taskDesc.name, taskDesc.serializedTask)  
  14.       }  

        首先,会判断对应的executor是否为空,为空的话,记录错误日志并退出,不为空的话,则按照如下流程处理:

 

        1、反序列话task,得到taskDesc;

        2、调用executor的launchTask()方法加载task

以上是关于Spark源码分析之七:Task运行的主要内容,如果未能解决你的问题,请参考以下文章

Spark 源码分析系列

Spark源码分析之六:Task调度

Spark源码分析之Checkpoint的过程

7. spark源码分析(基于yarn cluster模式)- Task划分提交

spark DAGSchedulerTaskScheduleExecutor执行task源码分析

8. spark源码分析(基于yarn cluster模式)- Task执行,Map端写入实现