Spark 任务推断执行机制

Posted 柚子聊大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 任务推断执行机制相关的知识,希望对你有一定的参考价值。

最近在定位现场问题时,发现日志里打印了很多的taskkilled (another attempt succeeded),经查看现场参数,原来业务同事试图通过开启spark speculative execution以提升性能。

本文要点:

  • 什么是speculative execution?
  • 如何开启spark speculative execution?有哪些相关的参数?
  • 该不该使用speculative execution?

Speculative Execution

在分布式环境下(包括spark、hadoop框架),可能存在资源、负载分布不均的问题,从而导致有的task执行的很慢,对于这种情况,前辈们就引入了speculative execution策略,以资源换时间,两个任务在不同机器上执行,最终哪个任务先执行完就用哪个的结果,并将慢的任务取消掉。

Spark 推断执行机制

下面从源码里看一下Spark Speculative Execution机制。

先看TaskSchedulerImpl

// spark 部分模式非local,且spark.speculation设为true时,会启动周期线程去判断是否需要启动“备份”任务.
// SPECULATION_INTERVAL_MS变量值即由spark.speculation.interval设定,默认是100ms.
if (!isLocal && conf.getBoolean("spark.speculation"false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleWithFixedDelay(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MSSPECULATION_INTERVAL_MSTimeUnit.MILLISECONDS)
}

再看TaskSetManager

override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
  // 若job仅拆分出了一个任务,不会开启推断机制.
    if (isZombie || numTasks == 1) {
      return false
    }
    var foundTasks = false
   // SPECULATION_QUANTILE,即由spark.speculation.quantile设置,默认为75%.
    val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
    logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
  // 已完成的任务数,占到了总任务数的75%
    if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
      val time = clock.getTimeMillis()
      var medianDuration = successfulTaskDurations.median
      // SPECULATION_MULTIPLIER,由spark.speculation.multiplier设置,默认值是1.5.
      // threshold值,即由已完成任务的时间中位数乘以1.5
      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
      logDebug("Task length threshold for speculation: " + threshold)
      for (tid <- runningTasksSet) {
        val info = taskInfos(tid)
        val index = info.index
        if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
          !speculatableTasks.contains(index)) {
          logInfo(
            "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
              .format(index, taskSet.id, info.host, threshold))
          speculatableTasks += index
          foundTasks = true
        }
      }
    }
    foundTasks
  }

对于speculatable task,相关的调度代码如下:

// execId,表示一个executor;host,表示executor所在的主机名;locality,表示数据本地性级别.
// 
protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
    : Option[(IntTaskLocality.Value)] =
  {
    // 在speculatable task放在HashSet中,到最终拿出来分配给executor,这之间是有时间差的,需要再次判断任务是否已经执行完毕.
    speculatableTasks.retain(index => !successful(index))
  // 判断speculatable task是否可以在这个executor上执行.
    // 1、原来的任务不是跑在这个机器上的.2、不在黑名单范围内.
    def canRunOnHost(index: Int): Boolean = {
      !hasAttemptOnHost(index, host) &&
        !isTaskBlacklistedOnExecOrNode(index, execId, host)
    }

    if (!speculatableTasks.isEmpty) {
      // Check for process-local tasks; note that tasks can be process-local
      // on multiple nodes when we replicate cached blocks, as in Spark Streaming
      for (index <- speculatableTasks if canRunOnHost(index)) {
        val prefs = tasks(index).preferredLocations
        val executors = prefs.flatMap(_ match {
          case e: ExecutorCacheTaskLocation => Some(e.executorId)
          case _ => None
        });
        if (executors.contains(execId)) {
          speculatableTasks -= index
          return Some((index, TaskLocality.PROCESS_LOCAL))
        }
      }
      
   // 下面是根据方法入参的数据本地性级别,来寻找匹配的任务.
      // Check for node-local tasks
      if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
        for (index <- speculatableTasks if canRunOnHost(index)) {
          val locations = tasks(index).preferredLocations.map(_.host)
          if (locations.contains(host)) {
            speculatableTasks -= index
            return Some((index, TaskLocality.NODE_LOCAL))
          }
        }
      }

      // Check for no-preference tasks
      if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
        for (index <- speculatableTasks if canRunOnHost(index)) {
          val locations = tasks(index).preferredLocations
          if (locations.size == 0) {
            speculatableTasks -= index
            return Some((index, TaskLocality.PROCESS_LOCAL))
          }
        }
      }

      // Check for rack-local tasks
      if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
        for (rack <- sched.getRackForHost(host)) {
          for (index <- speculatableTasks if canRunOnHost(index)) {
            val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
            if (racks.contains(rack)) {
              speculatableTasks -= index
              return Some((index, TaskLocality.RACK_LOCAL))
            }
          }
        }
      }

      // Check for non-local tasks
      if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
        for (index <- speculatableTasks if canRunOnHost(index)) {
          speculatableTasks -= index
          return Some((index, TaskLocality.ANY))
        }
      }
    }
    None
  }

该不该开启spark speculative execution机制?

我个人是不建议去开启speculative execution的。因为在我碰到的task执行时间长的案例中,都是出现了数据倾斜问题,开启speculative execution并不能解决数据倾斜问题,反而会浪费资源。

总结

  • Speculative Execution是分布式计算框架中常见的以空间换时间的策略,Spark、MapReduce框架中都引入了。
  • spark.speculation设置为true即可开启,默认情况下是关闭的。
  • 已完成task数量必须大于阈值(总任务数✖️spark.speculation.quantile),执行时间也必须超过阈值时(已完成任务时间的中位数✖️spark.speculation.multiplier),才会有speculative task。
  • 在使用speculative execution时,必须要坚信task慢是因为机器负载不均导致的,而不是由数据倾斜导致。
  • 在任务调度时,需要满足数据本地性级别要求。Spark数据本地性级别共有PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY五种。

以上是关于Spark 任务推断执行机制的主要内容,如果未能解决你的问题,请参考以下文章

Spark面试题——Spark的内存管理机制

Spark Core任务运行机制和Task源代码浅析1

11.spark sql之RDD转换DataSet

Spark 初始化机制(SparkContext干了什么?Scheduler 干了什么?)

Spark内存管理机制

Spark 任务调度机制详解