spark listener
Posted jason-dong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark listener相关的知识,希望对你有一定的参考价值。
最近在做一个需求,当spark程序在读数据或写数据时,将所读的条数或或所写的条数实时的展现出来,这里用到了SparkListener,sparklisten 可以获取spark 各个运行阶段的状态。
首先我们先通过代码来分析下各个方法的功能,再来说思路
package org.apache.spark
import org.apache.spark.scheduler._
import org.apache.spark.sql.{SaveMode, SparkSession}
object ListenerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local[*]")
.set("spark.extraListeners", classOf[BasicJobCounter].getName)
//.set("spark.extraListeners", classOf[BasicJobCounter].getName)
.set("spark.executor.heartbeatInterval", "1000ms")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
val fpath = args(0)
val csvdf = spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load(fpath)
csvdf.select("movieids").show()
val schema = csvdf.schema
val df = spark.createDataFrame(csvdf.rdd.repartition(5).setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)
//val df = spark.createDataFrame(csvdf.rdd.setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)
df.write.mode(SaveMode.Overwrite)
.format("csv")
.option("sep", ",")
.option("header", "true")
.save(args(1))
spark.stop()
}
}
private class OnlyExeUpdata extends SparkListener {
val map = scala.collection.mutable.Map.empty[Long, Long]
def getAccOut(list: List[AccumulableInfo], name: String): Option[AccumulableInfo] = {
list match {
case Nil => None
case head :: tail => if (head.name.isDefined && head.name.get == name) {
Some(head)
} else {
getAccOut(tail, name)
}
}
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
printx("onExecutorMetricsUpdate")
val execId = executorMetricsUpdate.execId
val accu = executorMetricsUpdate.accumUpdates
println(s"execId ${execId}")
for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {
//println(s"""${stageId} ${accumUpdates.mkString("<==>")}""")
/*for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {
println(s"""${taskId} ${stageId} ${acc}""")
if (3L == stageId) {
sum += acc.update.get.asInstanceOf[Long]
}
}*/
println(s"""==${taskId} ${accumUpdates.mkString("<==>")}==""")
val acc = getAccOut(accumUpdates.toList, "number of output rows")
if (3L == stageId && acc.isDefined) {
println(s"${taskId} ${acc.get.update.get.asInstanceOf[Long]}")
map += taskId -> acc.get.update.get.asInstanceOf[Long]
}
}
if (map.size > 0) {
val sum = map.values.reduce((x, y) => x + y)
println(s"sum $sum")
}
printx("onExecutorMetricsUpdate")
}
def printx(label: String): Unit = {
println(s"=" * 20 + label + s"=" * 20)
}
}
private class BasicJobCounter extends SparkListener {
var lacc = 0L
// app job stage executor task
//======================applicatioin=========================
/**
*app 开始时跳去的方法
* 该方法可以获取 appId,appName ,app开始的时间以及 执行程序的用户
* @param applicationStart
*/
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
printx("onApplicationStart")
println(s"applicationStart.appAttemptId = ${applicationStart.appAttemptId}")
println(s"applicationStart.appId = ${applicationStart.appId}")
println(s"applicationStart.appName = ${applicationStart.appName}")
println(s"applicationStart.driverLogs = ${applicationStart.driverLogs}")
println(s"applicationStart.sparkUser = ${applicationStart.sparkUser}")
println(s"applicationStart.time = ${applicationStart.time}")
printx("onApplicationStart")
}
/**
* app结束时调用的方法
* 可以获取app结束的时间点
* @param applicationEnd
*/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
printx("onApplicationEnd")
println(s"applicationEnd.time = ${applicationEnd.time}")
printx("onApplicationEnd")
}
//======================applicatioin=========================
//======================job===============================
/**
* job开始时调用的方法
* 可以获取jobId,以及该job所包含的stage的信息
* stage 包括如下信息:stageID,stage 中rdd name,stage 的name 和 task的数量
* @param jobStart
*/
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
printx("onJobStart")
println(s"jobStart.jobId = ${jobStart.jobId}")
jobStart.stageIds.foreach(a => println(s"stageId $a"))
val stageInfos = jobStart.stageInfos
stageInfos.foreach {
si =>
val rddInfos = si.rddInfos
println(Seq(
s"stageId ${si.stageId}",
si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
s"si.details ${si.details}",
s"si.name ${si.name}",
//si.taskMetrics.accumulators().mkString(","),
si.accumulables.mkString("accu", ",", "accu"),
s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
}
printx("onJobStart")
}
/**
* jobs 结束时候调用
* 可以获取jobID, jobResult(job 是否成功)
* @param jobEnd
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
printx("onJobEnd")
println(s"jobEnd.jobId ${jobEnd.jobId}")
println(s"jobEnd.jobResult ${jobEnd.jobResult}")
printx("onJobEnd")
}
//======================job===============================
//======================stage========================
/**
* stage 提交时调用的方法
* 可以获取stage的一些信息
* @param stageSubmitted
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
printx("onStageSubmitted")
val si = stageSubmitted.stageInfo
val rddInfos = si.rddInfos
println(Seq(
s"stageId ${si.stageId}",
si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
s"si.details ${si.details}",
s"si.name ${si.name}",
//si.taskMetrics.accumulators().mkString(","),
si.accumulables.mkString("accu", ",", "accu"),
s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
printx("onStageSubmitted")
}
/**
* stage 结束时调用的方法
* 可以获取stage 的一些信息,比较重要的accumulables,这里包含了一系列的统计信息,
* 其中就包含了所read or write 的条数,当然了时该stage 的总条数
* @param stageCompleted
*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
printx("onStageCompleted")
val si = stageCompleted.stageInfo
println(s"stageId ${stageCompleted.stageInfo.stageId}")
val rddInfos = si.rddInfos
println(Seq(
s"stageId ${si.stageId}",
si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
s"si.details ${si.details}",
s"si.name ${si.name}",
si.taskMetrics.accumulators().mkString("taskmetric", ",", "taskmetric"),
si.accumulables.mkString("accu", ",", "accu"),
s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
printx("onStageCompleted")
}
//======================stage========================
//===========================executor========================
/**
* 当executor的统计指标更新时调用的方法,
* 可以获取 executorID,以及executor中的累加器(重点就在此)
* 累加器中包含了taskID,stageID,以及统计信息(其中包含了读写的条数)
* @param executorMetricsUpdate
*/
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
printx("onExecutorMetricsUpdate")
val execId = executorMetricsUpdate.execId
val accu = executorMetricsUpdate.accumUpdates
println(s"execId ${execId}")
for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {
//println(s"""${stageId} ${accumUpdates.mkString("<==>")}""")
for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {
println(s"""${stageId} ${taskId} ${acc}""")
}
}
printx("onExecutorMetricsUpdate")
}
/**
* 添加executor时调用的方法
* @param executorAdded
*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
printx("onExecutorAdded")
val exeId = executorAdded.executorId
println(s"exeId ${exeId}")
val exeInfo = executorAdded.executorInfo
println(s"exeInfo.executorHost ${exeInfo.executorHost}")
println(s"exeInfo.logUrlMap ${exeInfo.logUrlMap}")
println(s"exeInfo.totalCores ${exeInfo.totalCores}")
printx("onExecutorAdded")
}
//===========================executor========================
//======================task===========================
/**
* task开始时调用的方法
* @param taskStart
*/
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
printx("onTaskStart")
val stageAttempId = taskStart.stageAttemptId
val stageId = taskStart.stageId
val taskInfo = taskStart.taskInfo
println(s"stageAttempId ${stageAttempId}")
println(s"stageId ${stageId}")
println(s"""taskInfo ${taskInfo.accumulables.mkString("<==>")}""")
printx("onTaskStart")
}
/**
* task 结束时调用的方法
* @param taskEnd
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
printx("onTaskEnd")
val reason = taskEnd.reason
val sid = taskEnd.stageId
val taskInfo = taskEnd.taskInfo
val taskMetrics = taskEnd.taskMetrics
println(s"taskend reason ${reason}")
println(s"stageId sid ${sid}")
println(s"taskInfo ${taskInfo.accumulables.mkString("<==>")}")
println(s"""taskMetrics ${taskMetrics.accumulators().mkString("<==>")}""")
printx("onTaskEnd")
}
//======================task===========================
def printx(label: String): Unit = {
println(s"=" * 20 + label + s"=" * 20)
}
}
思路是这样的:
1.df 底层所存储的rdd是可以命名的在获取df时,比如 val df = spark.read.format("csv")...load(..) 时,对df 内rdd重命名 ,df.rdd.setName("你想要取的名字"), 这样在onJobStart 时我们就可以找到对应的df 所在的stage,而onExecutorMetricsUpdate可以实时获取某个stage 所读/写的条数,只需将这个条数记录下来然后进行显示即可,注意:这里要对onExecutorMetricsUpdate 中所有task的累加器数据进行累加,文字不容易描述,具体见OnlyExeUpdata,否则统计的中间结果会大于实际值,onExecutorMetricsUpdate 会在心跳反馈时调用一次,因此可以通过设置心跳时间来控制该方法调用的频率,最后的准确值需要在onStageComplete 方法中获取,
要启动 sparkListener ,可以 在配置中添加对应类set("spark.extraListeners", classOf[BasicJobCounter].getName),文字不是很详细,具体可以运行代码结合输出结果来理解,
注: onExecutorMetricsUpdate 所对应的 accumUpdates 里可能包含了两条 (num of output rows) 的统计,有时候两条值一样,有时候一个为0,一个不为0,当对df repartition 后就会出现两个(num of output rows),具体原因还不清楚,使用时需注意,我只取了有值的(num of output rows),都有值的,随便选一个。
附上运行结果:结果可能有些被编辑器截断了
====================onExecutorAdded====================
exeId driver
exeInfo.executorHost localhost
exeInfo.logUrlMap Map()
exeInfo.totalCores 4
====================onExecutorAdded====================
====================onApplicationStart====================
applicationStart.appAttemptId = None
applicationStart.appId = Some(local-1543335649097)
applicationStart.appName = test
applicationStart.driverLogs = None
applicationStart.sparkUser = MI
applicationStart.time = 1543335646997
====================onApplicationStart====================
====================onExecutorMetricsUpdate====================
execId driver
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId driver
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId driver
====================onExecutorMetricsUpdate====================
====================onJobStart====================
jobStart.jobId = 0
stageId 0
<<<stageId 0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId 0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 0
taskInfo
====================onTaskStart====================
====================onExecutorMetricsUpdate====================
execId driver
0 0 AccumulableInfo(2,Some(number of output rows),Some(2),None,true,true,Some(sql))
0 0 AccumulableInfo(1,Some(number of output rows),Some(2),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 0
taskInfo AccumulableInfo(1,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(2,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(27,Some(internal.metrics.input.recordsRead),Some(2),Some(2),true,true,None)<==>AccumulableInfo(26,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(10,Some(internal.metrics.resultSize),Some(1299),Some(1299),true,true,None)<==>AccumulableInfo(9,Some(internal.metrics.executorCpuTime),Some(78125000),Some(78125000),true,true,None)<==>AccumulableInfo(8,Some(internal.metrics.executorRunTime),Some(91),Some(91),true,true,None)<==>AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),Some(23),Some(23),true,true,None)
taskMetrics LongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23)<==>LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91)<==>LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000)<==>LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 0, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 2, name: Some(number of output rows), value: 2)<==>SQLMetric(id: 1, name: Some(number of output rows), value: 2)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 0
<<<stageId 0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23),LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91),LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000),LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299),LongAccumulator(id: 11, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 12, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 13, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 14, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 15, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 16, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 17, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 18, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 19, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 20, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 21, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 22, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 23, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 24, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 25, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2),LongAccumulator(id: 28, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 29, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu26 -> AccumulableInfo(26,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),8 -> AccumulableInfo(8,Some(internal.metrics.executorRunTime),None,Some(91),true,true,None),2 -> AccumulableInfo(2,Some(number of output rows),None,Some(2),true,true,Some(sql)),7 -> AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),1 -> AccumulableInfo(1,Some(number of output rows),None,Some(2),true,true,Some(sql)),10 -> AccumulableInfo(10,Some(internal.metrics.resultSize),None,Some(1299),true,true,None),27 -> AccumulableInfo(27,Some(internal.metrics.input.recordsRead),None,Some(2),true,true,None),9 -> AccumulableInfo(9,Some(internal.metrics.executorCpuTime),None,Some(78125000),true,true,None),6 -> AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),None,Some(23),true,true,None)accu__fgf__si.numTasks 1>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId 0
jobEnd.jobResult JobSucceeded
====================onJobEnd====================
====================onJobStart====================
jobStart.jobId = 1
stageId 1
<<<stageId 1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId 1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
[Stage 1:> (0 + 4) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(644),Some(643),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(13),Some(13),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(1527),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(234375000),Some(234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(654),Some(654),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(18),Some(18),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 18)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 654)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 234375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 13)<==>LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 644)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onExecutorMetricsUpdate====================
execId driver
1 1 AccumulableInfo(54,Some(number of output rows),Some(159340),None,true,true,Some(sql))
1 2 AccumulableInfo(54,Some(number of output rows),Some(166588),None,true,true,Some(sql))
1 3 AccumulableInfo(54,Some(number of output rows),Some(163819),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 1:==============> (1 + 3) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(296289),Some(399901),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1153),Some(1796),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(296289),Some(399901),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(36),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1484),Some(3011),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(609375000),Some(843750000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1164),Some(1818),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(10),Some(28),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 10)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 609375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 13)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 625000000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(303907),Some(1000210),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1172),Some(4121),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(303907),Some(1000210),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(82),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(6022),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(765625000),Some(2234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1188),Some(4170),true,true,None)<==>AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(9),Some(50),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 9)<==>LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1188)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 765625000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 303907)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1172)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 303907)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 1
<<<stageId 1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 50),LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 4170),LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 2234375000),LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 6022),LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 82),LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1),LongAccumulator(id: 66, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 67, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 68, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 69, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 70, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 71, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 72, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 73, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 74, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 75, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 76, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 77, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 78, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 1000210),LongAccumulator(id: 81, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 82, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu59 -> AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),None,Some(50),true,true,None),80 -> AccumulableInfo(80,Some(internal.metrics.input.recordsRead),None,Some(1000210),true,true,None),62 -> AccumulableInfo(62,Some(internal.metrics.executorCpuTime),None,Some(2234375000),true,true,None),65 -> AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),None,Some(1),true,true,None),64 -> AccumulableInfo(64,Some(internal.metrics.jvmGCTime),None,Some(82),true,true,None),58 -> AccumulableInfo(58,Some(duration total (min, med, max)),None,Some(4121),true,true,Some(sql)),79 -> AccumulableInfo(79,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),61 -> AccumulableInfo(61,Some(internal.metrics.executorRunTime),None,Some(4170),true,true,None),60 -> AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),54 -> AccumulableInfo(54,Some(number of output rows),None,Some(1000210),true,true,Some(sql)),63 -> AccumulableInfo(63,Some(internal.metrics.resultSize),None,Some(6022),true,true,None)accu__fgf__si.numTasks 4>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId 1
jobEnd.jobResult JobSucceeded
====================onJobEnd====================
====================onJobStart====================
jobStart.jobId = 2
stageId 2
<<<stageId 2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId 2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 2
taskInfo
====================onTaskStart====================
====================onTaskEnd====================
taskend reason Success
stageId sid 2
taskInfo AccumulableInfo(84,Some(number of output rows),Some(22),Some(22),true,true,Some(sql))<==>AccumulableInfo(109,Some(internal.metrics.input.recordsRead),Some(22),Some(22),true,true,None)<==>AccumulableInfo(108,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(92,Some(internal.metrics.resultSize),Some(1310),Some(1310),true,true,None)<==>AccumulableInfo(91,Some(internal.metrics.executorCpuTime),Some(31250000),Some(31250000),true,true,None)<==>AccumulableInfo(90,Some(internal.metrics.executorRunTime),Some(25),Some(25),true,true,None)<==>AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),Some(4),Some(4),true,true,None)
taskMetrics LongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25)<==>LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000)<==>LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 83, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 84, name: Some(number of output rows), value: 22)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 2
<<<stageId 2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4),LongAccumulator(id: 89, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25),LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000),LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310),LongAccumulator(id: 93, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 94, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 95, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 96, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 97, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 98, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 99, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 100, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 101, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 102, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 103, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 104, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 105, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 106, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 107, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22),LongAccumulator(id: 110, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 111, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu92 -> AccumulableInfo(92,Some(internal.metrics.resultSize),None,Some(1310),true,true,None),109 -> AccumulableInfo(109,Some(internal.metrics.input.recordsRead),None,Some(22),true,true,None),91 -> AccumulableInfo(91,Some(internal.metrics.executorCpuTime),None,Some(31250000),true,true,None),88 -> AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),None,Some(4),true,true,None),108 -> AccumulableInfo(108,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),90 -> AccumulableInfo(90,Some(internal.metrics.executorRunTime),None,Some(25),true,true,None),84 -> AccumulableInfo(84,Some(number of output rows),None,Some(22),true,true,Some(sql))accu__fgf__si.numTasks 1>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId 2
jobEnd.jobResult JobSucceeded
====================onJobEnd====================
+--------+
|movieids|
+--------+
| 1193|
| 661|
| 914|
| 3408|
| 2355|
| 1197|
| 1287|
| 2804|
| 594|
| 919|
| 595|
| 938|
| 2398|
| 2918|
| 1035|
| 2791|
| 2687|
| 2018|
| 3105|
| 2797|
+--------+
only showing top 20 rows
====================onJobStart====================
jobStart.jobId = 3
stageId 3
stageId 4
<<<stageId 3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
<<<stageId 4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 5>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId 3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onExecutorMetricsUpdate====================
execId driver
3 6 AccumulableInfo(112,Some(number of output rows),Some(10394),None,true,true,Some(sql))
3 7 AccumulableInfo(112,Some(number of output rows),Some(7983),None,true,true,Some(sql))
3 8 AccumulableInfo(112,Some(number of output rows),Some(9630),None,true,true,Some(sql))
3 9 AccumulableInfo(112,Some(number of output rows),Some(10079),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 3:> (0 + 4) / 4]====================onExecutorMetricsUpdate====================
execId driver
3 6 AccumulableInfo(112,Some(number of output rows),Some(47124),None,true,true,Some(sql))
3 7 AccumulableInfo(112,Some(number of output rows),Some(43589),None,true,true,Some(sql))
3 8 AccumulableInfo(112,Some(number of output rows),Some(48100),None,true,true,Some(sql))
3 9 AccumulableInfo(112,Some(number of output rows),Some(47160),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(1617),Some(1616),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(98373696),Some(98373696),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(2298021),Some(2298021),true,true,None)<==>AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(19),Some(19),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1628),Some(1628),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(843750000),Some(843750000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(1718),Some(1718),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(6),Some(6),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 6)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 1718)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 843750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1628)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 19)<==>LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 2298021)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 103612)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 98373696)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 1617)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
[Stage 3:==============> (1 + 3) / 4]====================onExecutorMetricsUpdate====================
execId driver
3 6 AccumulableInfo(112,Some(number of output rows),Some(221438),None,true,true,Some(sql))
3 7 AccumulableInfo(112,Some(number of output rows),Some(219572),None,true,true,Some(sql))
3 8 AccumulableInfo(112,Some(number of output rows),Some(225414),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(296402),Some(400014),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2541),Some(4157),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(82364891),Some(180738587),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6554778),Some(8852799),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(43),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(3213),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1718750000),Some(2562500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2610),Some(4328),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(3),Some(9),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 3)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2610)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1718750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6554778)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296402)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 82364891)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2541)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
[Stage 3:=============================> (2 + 2) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(296289),Some(696303),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2578),Some(6735),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(15359858),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(91972656),Some(272711243),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6576036),Some(15428835),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(67),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(4798),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1750000000),Some(4312500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2661),Some(6989),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(7),Some(16),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 7)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2661)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1750000000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6576036)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296289)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 91972656)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2578)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(303906),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2600),Some(9335),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(80225232),Some(352936475),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6745230),Some(22174065),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(91),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(6383),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1765625000),Some(6078125000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2674),Some(9663),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(5),Some(21),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 5)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2674)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1765625000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6745230)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 303906)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 80225232)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 303906)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2600)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 303906)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 3
<<<stageId 3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 21),LongAccumulator(id: 119, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 9663),LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 6078125000),LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 6383),LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 91),LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 125, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 126, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 127, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 128, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 129, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 130, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 131, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 132, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 133, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 134, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 22174065),LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 1000209),LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 352936475),LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 1000209),LongAccumulator(id: 140, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 141, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu137 -> AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),None,Some(352936475),true,true,None),122 -> AccumulableInfo(122,Some(internal.metrics.resultSize),None,Some(6383),true,true,None),116 -> AccumulableInfo(116,Some(duration total (min, med, max)),None,Some(9335),true,true,Some(sql)),124 -> AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),118 -> AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),None,Some(21),true,true,None),136 -> AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),None,Some(1000209),true,true,None),139 -> AccumulableInfo(139,Some(internal.metrics.input.recordsRead),None,Some(1000209),true,true,None),121 -> AccumulableInfo(121,Some(internal.metrics.executorCpuTime),None,Some(6078125000),true,true,None),112 -> AccumulableInfo(112,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),120 -> AccumulableInfo(120,Some(internal.metrics.executorRunTime),None,Some(9663),true,true,None),138 -> AccumulableInfo(138,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),123 -> AccumulableInfo(123,Some(internal.metrics.jvmGCTime),None,Some(91),true,true,None),135 -> AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),None,Some(22174065),true,true,None)accu__fgf__si.numTasks 4>>>
====================onStageCompleted====================
====================onStageSubmitted====================
<<<stageId 4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 5>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
[Stage 4:> (0 + 4) / 5]====================onExecutorMetricsUpdate====================
execId driver
4 10 AccumulableInfo(117,Some(number of output rows),Some(8707),None,true,true,Some(sql))
4 11 AccumulableInfo(117,Some(number of output rows),Some(12415),None,true,true,Some(sql))
4 12 AccumulableInfo(117,Some(number of output rows),Some(11606),None,true,true,Some(sql))
4 13 AccumulableInfo(117,Some(number of output rows),Some(9539),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId driver
4 10 AccumulableInfo(117,Some(number of output rows),Some(50731),None,true,true,Some(sql))
4 11 AccumulableInfo(117,Some(number of output rows),Some(50721),None,true,true,Some(sql))
4 12 AccumulableInfo(117,Some(number of output rows),Some(51519),None,true,true,Some(sql))
4 13 AccumulableInfo(117,Some(number of output rows),Some(55871),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(200041),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(200041),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4436289),Some(4436289),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(4),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(46),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(1686),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1203125000),Some(1203125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2309),Some(2309),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(52),Some(52),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 52)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2309)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1203125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4436289)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(400082),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(400082),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434760),Some(8871049),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(8),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(92),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(3329),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1421875000),Some(2625000000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2335),Some(4644),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(43),Some(95),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 43)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2335)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1421875000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434760)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)
====================onTaskEnd====================
[Stage 4:=======================> (2 + 3) / 5]====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200043),Some(600125),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200043),Some(600125),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434141),Some(13305190),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(12),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(138),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(5015),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1328125000),Some(3953125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2362),Some(7006),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(46),Some(141),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 46)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2362)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1328125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434141)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200043)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200043)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(800167),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(800167),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4435220),Some(17740410),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(16),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(184),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(6701),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1343750000),Some(5296875000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2385),Some(9391),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(31250000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(41),Some(182),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 41)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2385)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1343750000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4435220)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)
====================onTaskEnd====================
====================onExecutorMetricsUpdate====================
execId driver
4 14 AccumulableInfo(117,Some(number of output rows),Some(20678),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 4:===============================================> (4 + 1) / 5]====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(1000209),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4433655),Some(22174065),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(20),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(8344),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(687500000),Some(5984375000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(715),Some(10106),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(46875000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(12),Some(194),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 12)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 715)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 687500000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4433655)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 4
<<<stageId 4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 194),LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 46875000),LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 10106),LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 5984375000),LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 8344),LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 184),LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 149, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 150, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 151, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 152, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 20),LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 22174065),LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 1000209),LongAccumulator(id: 159, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 160, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 161, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 162, name: Some(internal.metrics.input.bytesRead), value: 0),LongAccumulator(id: 163, name: Some(internal.metrics.input.recordsRead), value: 0),LongAccumulator(id: 164, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 165, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu146 -> AccumulableInfo(146,Some(internal.metrics.resultSize),None,Some(8344),true,true,None),155 -> AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),None,Some(0),true,true,None),158 -> AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),None,Some(1000209),true,true,None),142 -> AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),None,Some(194),true,true,None),145 -> AccumulableInfo(145,Some(internal.metrics.executorCpuTime),None,Some(5984375000),true,true,None),154 -> AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),None,Some(20),true,true,None),148 -> AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),157 -> AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),None,Some(0),true,true,None),147 -> AccumulableInfo(147,Some(internal.metrics.jvmGCTime),None,Some(184),true,true,None),156 -> AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),None,Some(22174065),true,true,None),144 -> AccumulableInfo(144,Some(internal.metrics.executorRunTime),None,Some(10106),true,true,None),153 -> AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),None,Some(0),true,true,None),117 -> AccumulableInfo(117,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),143 -> AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),None,Some(46875000),true,true,None)accu__fgf__si.numTasks 5>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId 3
jobEnd.jobResult JobSucceeded
====================onJobEnd====================
====================onApplicationEnd====================
applicationEnd.time = 1543335661199
====================onApplicationEnd====================
Process finished with exit code 0
以上是关于spark listener的主要内容,如果未能解决你的问题,请参考以下文章