SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)相关的知识,希望对你有一定的参考价值。
背景
本文基于SPARK 3.3.0
在Spark 3.3.0中出现了一个新特性那就是自动重启Executor,这个主要解决是什么问题呢? 主要解决在Streaming中由于一个Executor的处理延迟导致整个Streaming任务延迟,但是这也是适用于批任务,使得批任务Executor的驱逐更加灵活。具体的可参考SPARK-37810
分析
在spark 3.3.0之前,如果发现任务是比较慢或者任务失败了,
- 可以开启
spark.speculation
(默认是关闭的),进行推测执行, - 也可以开启
spark.excludeOnFailure.enabled
(默认是关闭的)以保证task不会重新调度到失败的Executor上 - 如果发现executor失败了,可以开启
spark.excludeOnFailure.killExcludedExecutors
(默认是关闭的),确保在fetch失败的时候,把execlude给删除掉。
但是这些都是事后的弥补方式,所以这里提出的Executor Rolling是事前预测执行的方式,该方式会周期性的轮询。
直接看代码ExecutorRollPlugin
:
class ExecutorRollPlugin extends SparkPlugin
override def driverPlugin(): DriverPlugin = new ExecutorRollDriverPlugin()
// No-op
override def executorPlugin(): ExecutorPlugin = null
它继承SparkPlugin
,关于SparkPlugin,可以参考spark 3.x Plugin Framework,总的来说,spark提供了一种插件机制,我们可以灵活的用它来做自己想要的事情,比如说 自定义指标等等。
我们看到这里executorPlugin
方法是为null
的,因为Executor的启动停止调度是在Driver进行的,所以executor根本不需要。
而对于ExecutorRollDriverPlugin
:
class ExecutorRollDriverPlugin extends DriverPlugin with Logging
override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] =
val interval = sc.conf.get(EXECUTOR_ROLL_INTERVAL)
if (interval <= 0)
logWarning(s"Disabled due to invalid interval value, '$interval'")
else if (!sc.conf.get(DECOMMISSION_ENABLED))
logWarning(s"Disabled because $DECOMMISSION_ENABLED.key is false.")
else
minTasks = sc.conf.get(MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING)
// Scheduler is not created yet
sparkContext = sc
val policy = ExecutorRollPolicy.withName(sc.conf.get(EXECUTOR_ROLL_POLICY))
periodicService.scheduleAtFixedRate(() =>
try
sparkContext.schedulerBackend match
case scheduler: KubernetesClusterSchedulerBackend =>
val executorSummaryList = sparkContext
.statusStore
.executorList(true)
choose(executorSummaryList, policy) match
case Some(id) =>
// Use decommission to be safe.
logInfo(s"Ask to decommission executor $id")
val now = System.currentTimeMillis()
scheduler.decommissionExecutor(
id,
ExecutorDecommissionInfo(s"Rolling via $policy at $now"),
adjustTargetNumExecutors = false)
case _ =>
logInfo("There is nothing to roll.")
case _ =>
logWarning("This plugin expects " +
s"$classOf[KubernetesClusterSchedulerBackend].getSimpleName.")
catch
case e: Throwable => logError("Error in rolling thread", e)
, interval, interval, TimeUnit.SECONDS)
Map.empty[String, String].asJava
....
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
: Option[String] =
val listWithoutDriver = list
.filterNot(_.id.equals(SparkContext.DRIVER_IDENTIFIER))
.filter(_.totalTasks >= minTasks)
val sortedList = policy match
case ExecutorRollPolicy.ID =>
// We can convert to integer because EXECUTOR_ID_COUNTER uses AtomicInteger.
listWithoutDriver.sortBy(_.id.toInt)
case ExecutorRollPolicy.ADD_TIME =>
listWithoutDriver.sortBy(_.addTime)
case ExecutorRollPolicy.TOTAL_GC_TIME =>
listWithoutDriver.sortBy(_.totalGCTime).reverse
case ExecutorRollPolicy.TOTAL_DURATION =>
listWithoutDriver.sortBy(_.totalDuration).reverse
case ExecutorRollPolicy.AVERAGE_DURATION =>
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
case ExecutorRollPolicy.FAILED_TASKS =>
listWithoutDriver.sortBy(_.failedTasks).reverse
case ExecutorRollPolicy.OUTLIER =>
// If there is no outlier we fallback to TOTAL_DURATION policy.
outliersFromMultipleDimensions(listWithoutDriver) ++
listWithoutDriver.sortBy(_.totalDuration).reverse
case ExecutorRollPolicy.OUTLIER_NO_FALLBACK =>
outliersFromMultipleDimensions(listWithoutDriver)
sortedList.headOption.map(_.id)
这里是periodicService
单个线程定时触发,如果发现backend是k8s的话(所以目前只适用于spark on k8s),就会从已有的AppStatusStore
(通过AppStatusListener机制获取到对应的Event,从而存储信息,目前来看,executor的metrics信息是通过heartbeat
来传递到driver端的)存储中取出Executor的信息,进而根据配置的策略(Executor创建的ID,失败的task,GC时间等)进行驱逐。
当然在驱逐Executor的时候,也会考虑目前在Executor上运行的task的个数,具体配置为spark.kubernetes.executor.minTasksPerExecutorBeforeRolling
(默认是0),只有小于等于该阈值,才会kill 对应的Executor,而且默认是只驱逐一个Executor。
该方式的优点:
- 事前的处理方式,而不是事后处理
- 独立于ExecutorPodsAllocator,使组件之间功能明确,便于代码维护
- 适用于动态和静态Executor资源分配的场景
- 驱逐策略根据运行时的统计信息来的,更加合理
具体使用方式,配置如下:
spark.plugins=org.apache.spark.scheduler.cluster.k8s.ExecutorRollPlugin
spark.decommission.enabled=true
spark.kubernetes.executor.rollInterval=3600s
当然,对于对于这种Executor驱逐,有其他公司也提出了不同的解决方法,如:SPARK-37028
以上是关于SPARK k8s backend中Executor Rolling(Executor的自动化滚动驱逐)的主要内容,如果未能解决你的问题,请参考以下文章