7. spark源码分析(基于yarn cluster模式)- Task划分提交
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7. spark源码分析(基于yarn cluster模式)- Task划分提交相关的知识,希望对你有一定的参考价值。
本系列基于spark-2.4.6
通过上一节的分析,我们最后发现spark通过submitMissingTasks
来提交Stage。这个章节我们来分析一下其实现以及Task的划分和提交。
private def submitMissingTasks(stage: Stage, jobId: Int) {
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
var taskBinaryBytes: Array[Byte] = null
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
上面代码有点长,这里先说一下,通过前面的代码,我们发现Spark中的Stage只有两种:
- ShuffleMapStage
- ResultStage
最后提交的都是ResultStage。
这里首先通过val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
来找到当前Stage未处理的分区:
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
}
def findMissingPartitions(shuffleId: Int): Option[Seq[Int]] = {
shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
}
def findMissingPartitions(): Seq[Int] = synchronized {
val missing = (0 until numPartitions).filter(id => mapStatuses(id) == null)
assert(missing.size == numPartitions - _numAvailableOutputs,
s"${missing.size} missing, expected ${numPartitions - _numAvailableOutputs}")
missing
}
可以看到Spark针对每个ShuffleMaStage的每个分区维护了一个状态ShuffleStatus
,通过他来记录一些状态。
outputCommitCoordinator.stageStart
主要用来标记当前Stage的状态。
然后就是获取Stage分区数据的位置,方便后续分配给Executor执行器执行任务的时候与数据更近。
然后创建ShuffleMapTask
,这里每个分区都创建一个ShuffleMapTask
:
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
到这里就获取到了ShuffleMapStage
的所有ShuffleMapTask
,然后封装成TaskSet
,通过taskScheduler
提交:
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
yarn cluster最终通过TaskSchedulerImpl
实现:
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
最后通过backend.reviveOffers()
来通知Driver自己,实现:
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
而处理如下:
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
}
}
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread, reason) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
KillTask(taskId, executorId, interruptThread, reason))
case None =>
}
case KillExecutorsOnHost(host) =>
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false,
force = true)
}
case UpdateDelegationTokens(newDelegationTokens) =>
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
case RemoveExecutor(executorId, reason) =>
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}
调用makeOffers
方法,最后调用了launchTasks
执行任务:
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
}
这里需要说下,在这步,会分配每个task对应的executor。
下一节我们分析Task怎么执行的。
以上是关于7. spark源码分析(基于yarn cluster模式)- Task划分提交的主要内容,如果未能解决你的问题,请参考以下文章
2. spark-2.4.6源码分析(基于yarn cluster模式)-YARN client启动,提交ApplicationMaster
3. spark-2.4.6源码分析(基于yarn cluster模式)-YARN contaienr启动-CoarseGrainedExecutorBackend
1. spark-2.4.6源码分析(基于yarn cluster模式)-任务提交
4. spark-2.4.6源码分析(基于yarn cluster模式)- SparkContext启动