Spark 任务推断执行机制
Posted 柚子聊大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 任务推断执行机制相关的知识,希望对你有一定的参考价值。
最近在定位现场问题时,发现日志里打印了很多的taskkilled (another attempt succeeded),经查看现场参数,原来业务同事试图通过开启spark speculative execution以提升性能。
本文要点:
-
什么是speculative execution? -
如何开启spark speculative execution?有哪些相关的参数? -
该不该使用speculative execution?
Speculative Execution
在分布式环境下(包括spark、hadoop框架),可能存在资源、负载分布不均的问题,从而导致有的task执行的很慢,对于这种情况,前辈们就引入了speculative execution策略,以资源换时间,两个任务在不同机器上执行,最终哪个任务先执行完就用哪个的结果,并将慢的任务取消掉。
Spark 推断执行机制
下面从源码里看一下Spark Speculative Execution机制。
先看TaskSchedulerImpl
// spark 部分模式非local,且spark.speculation设为true时,会启动周期线程去判断是否需要启动“备份”任务.
// SPECULATION_INTERVAL_MS变量值即由spark.speculation.interval设定,默认是100ms.
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
再看TaskSetManager
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// 若job仅拆分出了一个任务,不会开启推断机制.
if (isZombie || numTasks == 1) {
return false
}
var foundTasks = false
// SPECULATION_QUANTILE,即由spark.speculation.quantile设置,默认为75%.
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
// 已完成的任务数,占到了总任务数的75%
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
var medianDuration = successfulTaskDurations.median
// SPECULATION_MULTIPLIER,由spark.speculation.multiplier设置,默认值是1.5.
// threshold值,即由已完成任务的时间中位数乘以1.5
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
foundTasks = true
}
}
}
foundTasks
}
对于speculatable task,相关的调度代码如下:
// execId,表示一个executor;host,表示executor所在的主机名;locality,表示数据本地性级别.
//
protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
// 在speculatable task放在HashSet中,到最终拿出来分配给executor,这之间是有时间差的,需要再次判断任务是否已经执行完毕.
speculatableTasks.retain(index => !successful(index))
// 判断speculatable task是否可以在这个executor上执行.
// 1、原来的任务不是跑在这个机器上的.2、不在黑名单范围内.
def canRunOnHost(index: Int): Boolean = {
!hasAttemptOnHost(index, host) &&
!isTaskBlacklistedOnExecOrNode(index, execId, host)
}
if (!speculatableTasks.isEmpty) {
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_ match {
case e: ExecutorCacheTaskLocation => Some(e.executorId)
case _ => None
});
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
// 下面是根据方法入参的数据本地性级别,来寻找匹配的任务.
// Check for node-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations.map(_.host)
if (locations.contains(host)) {
speculatableTasks -= index
return Some((index, TaskLocality.NODE_LOCAL))
}
}
}
// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}
// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
if (racks.contains(rack)) {
speculatableTasks -= index
return Some((index, TaskLocality.RACK_LOCAL))
}
}
}
}
// Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
speculatableTasks -= index
return Some((index, TaskLocality.ANY))
}
}
}
None
}
该不该开启spark speculative execution机制?
我个人是不建议去开启speculative execution的。因为在我碰到的task执行时间长的案例中,都是出现了数据倾斜问题,开启speculative execution并不能解决数据倾斜问题,反而会浪费资源。
总结
-
Speculative Execution是分布式计算框架中常见的以空间换时间的策略,Spark、MapReduce框架中都引入了。 -
spark.speculation设置为true即可开启,默认情况下是关闭的。 -
已完成task数量必须大于阈值(总任务数✖️spark.speculation.quantile),执行时间也必须超过阈值时(已完成任务时间的中位数✖️spark.speculation.multiplier),才会有speculative task。 -
在使用speculative execution时,必须要坚信task慢是因为机器负载不均导致的,而不是由数据倾斜导致。 -
在任务调度时,需要满足数据本地性级别要求。Spark数据本地性级别共有PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY五种。
以上是关于Spark 任务推断执行机制的主要内容,如果未能解决你的问题,请参考以下文章