SPARK中metrics是怎么传递的
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK中metrics是怎么传递的相关的知识,希望对你有一定的参考价值。
背景
本文基于spark 3.3.0
在看spark源码的时候,总是会看到类似longMetric("numOutputRows")
的信息,但是一般来说这种metrics
的定义一般是在Driver
端,而真正的+1
或者-1
操作都是在executor
进行的,这种指标到底是怎么传递的呢?我们分析一下
分析
以FilterExec
物理计划为例:
case class FilterExec(condition: Expression, child: SparkPlan)
extends UnaryExecNode with CodegenSupport with GeneratePredicateHelper
...
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
...
protected override def doExecute(): RDD[InternalRow] =
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal (index, iter) =>
val predicate = Predicate.create(condition, child.output)
predicate.initialize(0)
iter.filter row =>
val r = predicate.eval(row)
if (r) numOutputRows += 1
r
为什么这么写可以
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
,这里只定义了一个numOutputRows
的指标,用来记录该物理操作过滤了多少行的数据if (r) numOutputRows += 1
这个操作会在executor
端执行
其实要看懂这个操作,我们要深入一下SQLMetrics.createMetric
的实现
def createMetric(sc: SparkContext, name: String): SQLMetric =
val acc = new SQLMetric(SUM_METRIC)
acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
acc
...
abstract class AccumulatorV2[IN, OUT] extends Serializable
其中SQLMetric
类是继承AccumulatorV2
,从而继承了Serializable
,所以这个类是可序列化的,而且是可java序列化的,这一点很重要。
再看SQLMetric
的register
方法,
private[spark] def register(
sc: SparkContext,
name: Option[String] = None,
countFailedValues: Boolean = false): Unit =
if (this.metadata != null)
throw new IllegalStateException("Cannot register an Accumulator twice.")
this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
AccumulatorContext.register(this)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues)
分配一个拥有全局唯一的id的AccumulatorMetadata
实例AccumulatorContext.register(this)
这个调用了往map中登记了以全局唯一id为key,value为WeakReference的值,这里登记到map
的作用就是后续Task
会对该metrics
的值进行操作,下面会说到sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
这步操作和之前的文章说的一样SPARK 是怎么清除Shuffle中间结果数据的,只不过这里只是清理了Driver
端的metrics
这里很重要:
在scala里会有闭包的概念(这里可以自己网上查找原理),但是spark也会对闭包进一步进行处理,详见ClosureCleaner.clean
方法。总结一下,简单来说,就是exeuctor会序列化用到的变量,所以说SQLMetric
必须是可java序列化的(同时全局唯一的id也会被序列化)。
executor端的变量怎么传递到Driver端的
- 我们先来看AccumulatorV2的readObject方法:
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
in.defaultReadObject()
if (atDriverSide)
atDriverSide = false
// Automatically register the accumulator when it is deserialized with the task closure.
// This is for external accumulators and internal ones that do not represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null)
taskContext.registerAccumulator(this)
else
atDriverSide = true
这个代码会在Executor
执行,所以会执行taskContext.registerAccumulator(this)
从而调用taskMetrics.registerAccumulator(a)
,从而保存在名为externalAccums
的ArrayBuffer
中
2. 再看task端的执行TaskRunner
的run()
方法:
task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) ... val accumUpdates = task.collectAccumulatorUpdates()
- 这里会调用
ser.deserialize
方法,从而触发AccumulatorV2
的readObject
方法,从而该AccumulatorV2
变量会保存在executor
端,且保留了全局唯一id。 val accumUpdates = task.collectAccumulatorUpdates()
收集spark内置的metrics(如remoteBlocksFetched)和自定义的metrics,
这个会通过execBackend.statusUpdate
方法,传达Driver端,最终调用到DAGScheduler
的updateAccumulators
方法更新指标:private def updateAccumulators(event: CompletionEvent): Unit = val task = event.task val stage = stageIdToStage(task.stageId) event.accumUpdates.foreach updates => val id = updates.id try // Find the corresponding accumulator on the driver and update it val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]] case None => throw SparkCoreErrors.accessNonExistentAccumulatorError(id) acc.merge(updates.asInstanceOf[AccumulatorV2[Any, Any]]) // To avoid UI cruft, ignore cases where value wasn't updated if (acc.name.isDefined && !updates.isZero) stage.latestInfo.accumulables(id) = acc.toInfo(None, Some(acc.value)) event.taskInfo.setAccumulables( acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
acc.merge
这个方法就完成了指标的更新。event.taskInfo.setAccumulables
这个是给当前event更新到最新的metrics,因为最终driver调用SparkListenerTaskEnd
方法,从而被AppStatusListener
的onTaskEnd
方法接受,从而完成Spark UI的更新(被AppStatusStore调用)。
同时也被SQLAppStatusListener
的onTaskEnd
方法接受,这里读者自己看代码即可,结果也是完成Spark UI的更新(被SQLAppStatusStore调用)
- 再看Executor端的reportHeartBeat方法:
这个private def reportHeartBeat(): Unit = ... val accumulatorsToReport = if (HEARTBEAT_DROP_ZEROES) taskRunner.task.metrics.accumulators().filterNot(_.isZero) else taskRunner.task.metrics.accumulators() accumUpdates += ((taskRunner.taskId, accumulatorsToReport)) ... val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId, executorUpdates) try val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
reportHeartBeat
会被周期的性的调用,用来向driver发送心跳信息,同时会带上metrics信息(包括spark内置的metrics和自定义的metrics),该方法通过向driver
发送Heartbeat
消息,最终会调用到DAGScheduler
的executorHeartbeatReceived
方法,从而被AppStatusListener
的onExecutorMetricsUpdate
方法接受:
这里更新的是正在运行的task的指标更新,从而更新到Spark UI界面(被AppStatusStore调用)。override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = val now = System.nanoTime() event.accumUpdates.foreach case (taskId, sid, sAttempt, accumUpdates) => liveTasks.get(taskId).foreach task => val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates) val delta = task.updateMetrics(metrics) maybeUpdate(task, now) Option(liveStages.get((sid, sAttempt))).foreach stage => stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta) maybeUpdate(stage, now) val esummary = stage.executorSummary(event.execId) esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta) maybeUpdate(esummary, now)
还有被SQLAppStatusListener
的onExecutorMetricsUpdate
方法接受,这里读者自己看代码即可,结果也是完成Spark UI的更新(被SQLAppStatusStore调用)
总结
在Driver端定义的metrics,会被反序列化到Executor端,在Executor端,通过两种方式传回Driver端:
- 在任务运行期间,利用heartbeat心跳来传递metrics
- 在任务结束以后,利用任务结果的更新来传递metrics
最终,都是通过sparkListener:SQLAppStatusListener和 AppStatusListener分别完成Spark UI状态的更新。
以上是关于SPARK中metrics是怎么传递的的主要内容,如果未能解决你的问题,请参考以下文章