spark listener

Posted jason-dong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark listener相关的知识,希望对你有一定的参考价值。

最近在做一个需求,当spark程序在读数据或写数据时,将所读的条数或或所写的条数实时的展现出来,这里用到了SparkListener,sparklisten 可以获取spark 各个运行阶段的状态。

首先我们先通过代码来分析下各个方法的功能,再来说思路
package org.apache.spark
 
import org.apache.spark.scheduler._
import org.apache.spark.sql.{SaveMode, SparkSession}
 
object ListenerTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local[*]")
      .set("spark.extraListeners", classOf[BasicJobCounter].getName)
      //.set("spark.extraListeners", classOf[BasicJobCounter].getName)
      .set("spark.executor.heartbeatInterval", "1000ms")
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val fpath = args(0)
    val csvdf = spark.read.format("csv")
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .load(fpath)
    csvdf.select("movieids").show()
    val schema = csvdf.schema
    val df = spark.createDataFrame(csvdf.rdd.repartition(5).setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)
    //val df = spark.createDataFrame(csvdf.rdd.setName("xxxxxxxxxxxxxxxxxxxxxxxxx"), schema)
    df.write.mode(SaveMode.Overwrite)
      .format("csv")
      .option("sep", ",")
      .option("header", "true")
      .save(args(1))
    spark.stop()
  }
}
 
private class OnlyExeUpdata extends SparkListener {
 
  val map = scala.collection.mutable.Map.empty[Long, Long]
 
  def getAccOut(list: List[AccumulableInfo], name: String): Option[AccumulableInfo] = {
    list match {
      case Nil => None
      case head :: tail => if (head.name.isDefined && head.name.get == name) {
        Some(head)
      } else {
        getAccOut(tail, name)
      }
    }
 
  }
 
  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
    printx("onExecutorMetricsUpdate")
    val execId = executorMetricsUpdate.execId
    val accu = executorMetricsUpdate.accumUpdates
    println(s"execId  ${execId}")
    for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {
      //println(s"""${stageId} ${accumUpdates.mkString("<==>")}""")
      /*for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {
        println(s"""${taskId} ${stageId} ${acc}""")
        if (3L == stageId) {
          sum += acc.update.get.asInstanceOf[Long]
        }
      }*/
      println(s"""==${taskId}  ${accumUpdates.mkString("<==>")}==""")
      val acc = getAccOut(accumUpdates.toList, "number of output rows")
      if (3L == stageId && acc.isDefined) {
        println(s"${taskId} ${acc.get.update.get.asInstanceOf[Long]}")
        map += taskId -> acc.get.update.get.asInstanceOf[Long]
      }
    }
    if (map.size > 0) {
      val sum = map.values.reduce((x, y) => x + y)
      println(s"sum $sum")
    }
 
    printx("onExecutorMetricsUpdate")
  }
 
  def printx(label: String): Unit = {
    println(s"=" * 20 + label + s"=" * 20)
  }
}
 
private class BasicJobCounter extends SparkListener {
  var lacc = 0L
  // app job stage executor task
 
  //======================applicatioin=========================
  /**
    *app 开始时跳去的方法
    * 该方法可以获取 appId,appName ,app开始的时间以及 执行程序的用户
    * @param applicationStart
    */
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    printx("onApplicationStart")
    println(s"applicationStart.appAttemptId = ${applicationStart.appAttemptId}")
    println(s"applicationStart.appId = ${applicationStart.appId}")
    println(s"applicationStart.appName = ${applicationStart.appName}")
    println(s"applicationStart.driverLogs = ${applicationStart.driverLogs}")
    println(s"applicationStart.sparkUser = ${applicationStart.sparkUser}")
    println(s"applicationStart.time = ${applicationStart.time}")
    printx("onApplicationStart")
  }
 
  /**
    * app结束时调用的方法
    * 可以获取app结束的时间点
    * @param applicationEnd
    */
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    printx("onApplicationEnd")
    println(s"applicationEnd.time  =  ${applicationEnd.time}")
    printx("onApplicationEnd")
  }
 
  //======================applicatioin=========================
  //======================job===============================
  /**
    * job开始时调用的方法
    * 可以获取jobId,以及该job所包含的stage的信息
    * stage 包括如下信息:stageID,stage 中rdd name,stage 的name 和 task的数量
    * @param jobStart
    */
  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    printx("onJobStart")
    println(s"jobStart.jobId = ${jobStart.jobId}")
    jobStart.stageIds.foreach(a => println(s"stageId  $a"))
    val stageInfos = jobStart.stageInfos
    stageInfos.foreach {
      si =>
        val rddInfos = si.rddInfos
        println(Seq(
          s"stageId  ${si.stageId}",
          si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
          s"si.details ${si.details}",
          s"si.name ${si.name}",
          //si.taskMetrics.accumulators().mkString(","),
          si.accumulables.mkString("accu", ",", "accu"),
          s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
    }
    printx("onJobStart")
  }
 
  /**
    * jobs 结束时候调用
    * 可以获取jobID, jobResult(job 是否成功)
    * @param jobEnd
    */
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    printx("onJobEnd")
    println(s"jobEnd.jobId  ${jobEnd.jobId}")
    println(s"jobEnd.jobResult  ${jobEnd.jobResult}")
    printx("onJobEnd")
  }
 
  //======================job===============================
 
  //======================stage========================
  /**
    * stage 提交时调用的方法
    * 可以获取stage的一些信息
    * @param stageSubmitted
    */
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
    printx("onStageSubmitted")
    val si = stageSubmitted.stageInfo
    val rddInfos = si.rddInfos
    println(Seq(
      s"stageId  ${si.stageId}",
      si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
      s"si.details ${si.details}",
      s"si.name ${si.name}",
      //si.taskMetrics.accumulators().mkString(","),
      si.accumulables.mkString("accu", ",", "accu"),
      s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
    printx("onStageSubmitted")
  }
 
  /**
    * stage 结束时调用的方法
    * 可以获取stage 的一些信息,比较重要的accumulables,这里包含了一系列的统计信息,
    * 其中就包含了所read or write 的条数,当然了时该stage 的总条数
    * @param stageCompleted
    */
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    printx("onStageCompleted")
    val si = stageCompleted.stageInfo
    println(s"stageId ${stageCompleted.stageInfo.stageId}")
    val rddInfos = si.rddInfos
    println(Seq(
      s"stageId  ${si.stageId}",
      si.rddInfos.map(a => a.name).mkString("rddname ", ",", " rddname"),
      s"si.details ${si.details}",
      s"si.name ${si.name}",
      si.taskMetrics.accumulators().mkString("taskmetric", ",", "taskmetric"),
      si.accumulables.mkString("accu", ",", "accu"),
      s"si.numTasks ${si.numTasks}").mkString("<<<", "__fgf__", ">>>"))
    printx("onStageCompleted")
  }
 
  //======================stage========================
 
 
  //===========================executor========================
  /**
    * 当executor的统计指标更新时调用的方法,
    * 可以获取 executorID,以及executor中的累加器(重点就在此)
    * 累加器中包含了taskID,stageID,以及统计信息(其中包含了读写的条数)
    * @param executorMetricsUpdate
    */
  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
    printx("onExecutorMetricsUpdate")
    val execId = executorMetricsUpdate.execId
    val accu = executorMetricsUpdate.accumUpdates
    println(s"execId   ${execId}")
    for ((taskId, stageId, stageAttemptId, accumUpdates) <- accu) {
      //println(s"""${stageId} ${accumUpdates.mkString("<==>")}""")
      for (acc <- accumUpdates if (acc.name.isDefined && "number of output rows" == acc.name.get)) {
        println(s"""${stageId} ${taskId} ${acc}""")
      }
    }
    printx("onExecutorMetricsUpdate")
  }
 
  /**
    * 添加executor时调用的方法
    * @param executorAdded
    */
  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
    printx("onExecutorAdded")
    val exeId = executorAdded.executorId
    println(s"exeId  ${exeId}")
    val exeInfo = executorAdded.executorInfo
    println(s"exeInfo.executorHost ${exeInfo.executorHost}")
    println(s"exeInfo.logUrlMap ${exeInfo.logUrlMap}")
    println(s"exeInfo.totalCores ${exeInfo.totalCores}")
    printx("onExecutorAdded")
  }
 
 
  //===========================executor========================
 
  //======================task===========================
  /**
    * task开始时调用的方法
    * @param taskStart
    */
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    printx("onTaskStart")
    val stageAttempId = taskStart.stageAttemptId
    val stageId = taskStart.stageId
    val taskInfo = taskStart.taskInfo
    println(s"stageAttempId ${stageAttempId}")
    println(s"stageId ${stageId}")
    println(s"""taskInfo ${taskInfo.accumulables.mkString("<==>")}""")
    printx("onTaskStart")
  }
 
  /**
    * task 结束时调用的方法
    * @param taskEnd
    */
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    printx("onTaskEnd")
    val reason = taskEnd.reason
    val sid = taskEnd.stageId
    val taskInfo = taskEnd.taskInfo
    val taskMetrics = taskEnd.taskMetrics
    println(s"taskend reason ${reason}")
    println(s"stageId sid ${sid}")
    println(s"taskInfo ${taskInfo.accumulables.mkString("<==>")}")
    println(s"""taskMetrics ${taskMetrics.accumulators().mkString("<==>")}""")
    printx("onTaskEnd")
  }
 
  //======================task===========================
 
  def printx(label: String): Unit = {
    println(s"=" * 20 + label + s"=" * 20)
  }
}
思路是这样的:
1.df 底层所存储的rdd是可以命名的在获取df时,比如 val df = spark.read.format("csv")...load(..) 时,对df 内rdd重命名 ,df.rdd.setName("你想要取的名字"), 这样在onJobStart 时我们就可以找到对应的df 所在的stage,而onExecutorMetricsUpdate可以实时获取某个stage 所读/写的条数,只需将这个条数记录下来然后进行显示即可,注意:这里要对onExecutorMetricsUpdate 中所有task的累加器数据进行累加,文字不容易描述,具体见OnlyExeUpdata,否则统计的中间结果会大于实际值,onExecutorMetricsUpdate 会在心跳反馈时调用一次,因此可以通过设置心跳时间来控制该方法调用的频率,最后的准确值需要在onStageComplete 方法中获取,
要启动 sparkListener ,可以 在配置中添加对应类set("spark.extraListeners", classOf[BasicJobCounter].getName),文字不是很详细,具体可以运行代码结合输出结果来理解,
注: onExecutorMetricsUpdate  所对应的 accumUpdates 里可能包含了两条 (num of output rows) 的统计,有时候两条值一样,有时候一个为0,一个不为0,当对df repartition 后就会出现两个(num of output rows),具体原因还不清楚,使用时需注意,我只取了有值的(num of output rows),都有值的,随便选一个。
附上运行结果:结果可能有些被编辑器截断了
 
====================onExecutorAdded====================
exeId  driver
exeInfo.executorHost localhost
exeInfo.logUrlMap Map()
exeInfo.totalCores 4
====================onExecutorAdded====================
====================onApplicationStart====================
applicationStart.appAttemptId = None
applicationStart.appId = Some(local-1543335649097)
applicationStart.appName = test
applicationStart.driverLogs = None
applicationStart.sparkUser = MI
applicationStart.time = 1543335646997
====================onApplicationStart====================
====================onExecutorMetricsUpdate====================
execId   driver
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId   driver
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId   driver
====================onExecutorMetricsUpdate====================
====================onJobStart====================
jobStart.jobId = 0
stageId  0
<<<stageId  0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId  0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 0
taskInfo
====================onTaskStart====================
====================onExecutorMetricsUpdate====================
execId   driver
0    0    AccumulableInfo(2,Some(number of output rows),Some(2),None,true,true,Some(sql))
0    0    AccumulableInfo(1,Some(number of output rows),Some(2),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 0
taskInfo AccumulableInfo(1,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(2,Some(number of output rows),Some(2),Some(2),true,true,Some(sql))<==>AccumulableInfo(27,Some(internal.metrics.input.recordsRead),Some(2),Some(2),true,true,None)<==>AccumulableInfo(26,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(10,Some(internal.metrics.resultSize),Some(1299),Some(1299),true,true,None)<==>AccumulableInfo(9,Some(internal.metrics.executorCpuTime),Some(78125000),Some(78125000),true,true,None)<==>AccumulableInfo(8,Some(internal.metrics.executorRunTime),Some(91),Some(91),true,true,None)<==>AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),Some(23),Some(23),true,true,None)
taskMetrics LongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23)<==>LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91)<==>LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000)<==>LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 0, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 2, name: Some(number of output rows), value: 2)<==>SQLMetric(id: 1, name: Some(number of output rows), value: 2)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 0
<<<stageId  0__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,FileScanRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 6, name: Some(internal.metrics.executorDeserializeTime), value: 23),LongAccumulator(id: 7, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 8, name: Some(internal.metrics.executorRunTime), value: 91),LongAccumulator(id: 9, name: Some(internal.metrics.executorCpuTime), value: 78125000),LongAccumulator(id: 10, name: Some(internal.metrics.resultSize), value: 1299),LongAccumulator(id: 11, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 12, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 13, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 14, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 15, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 16, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 17, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 18, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 19, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 20, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 21, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 22, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 23, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 24, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 25, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 26, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 27, name: Some(internal.metrics.input.recordsRead), value: 2),LongAccumulator(id: 28, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 29, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu26 -> AccumulableInfo(26,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),8 -> AccumulableInfo(8,Some(internal.metrics.executorRunTime),None,Some(91),true,true,None),2 -> AccumulableInfo(2,Some(number of output rows),None,Some(2),true,true,Some(sql)),7 -> AccumulableInfo(7,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),1 -> AccumulableInfo(1,Some(number of output rows),None,Some(2),true,true,Some(sql)),10 -> AccumulableInfo(10,Some(internal.metrics.resultSize),None,Some(1299),true,true,None),27 -> AccumulableInfo(27,Some(internal.metrics.input.recordsRead),None,Some(2),true,true,None),9 -> AccumulableInfo(9,Some(internal.metrics.executorCpuTime),None,Some(78125000),true,true,None),6 -> AccumulableInfo(6,Some(internal.metrics.executorDeserializeTime),None,Some(23),true,true,None)accu__fgf__si.numTasks 1>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId  0
jobEnd.jobResult  JobSucceeded
====================onJobEnd====================
====================onJobStart====================
jobStart.jobId = 1
stageId  1
<<<stageId  1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId  1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 1
taskInfo
====================onTaskStart====================
[Stage 1:>                                                          (0 + 4) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(644),Some(643),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(13),Some(13),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(1527),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(234375000),Some(234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(654),Some(654),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(18),Some(18),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 18)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 654)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 234375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 13)<==>LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 644)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onExecutorMetricsUpdate====================
execId   driver
1    1    AccumulableInfo(54,Some(number of output rows),Some(159340),None,true,true,Some(sql))
1    2    AccumulableInfo(54,Some(number of output rows),Some(166588),None,true,true,Some(sql))
1    3    AccumulableInfo(54,Some(number of output rows),Some(163819),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 1:==============>                                            (1 + 3) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(296289),Some(399901),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1153),Some(1796),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(296289),Some(399901),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(36),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1484),Some(3011),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(609375000),Some(843750000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1164),Some(1818),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(10),Some(28),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 10)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 609375000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 13)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1164)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 625000000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1484)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1153)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 1
taskInfo AccumulableInfo(54,Some(number of output rows),Some(303907),Some(1000210),true,true,Some(sql))<==>AccumulableInfo(58,Some(duration total (min, med, max)),Some(1172),Some(4121),true,true,Some(sql))<==>AccumulableInfo(80,Some(internal.metrics.input.recordsRead),Some(303907),Some(1000210),true,true,None)<==>AccumulableInfo(79,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(64,Some(internal.metrics.jvmGCTime),Some(23),Some(82),true,true,None)<==>AccumulableInfo(63,Some(internal.metrics.resultSize),Some(1527),Some(6022),true,true,None)<==>AccumulableInfo(62,Some(internal.metrics.executorCpuTime),Some(765625000),Some(2234375000),true,true,None)<==>AccumulableInfo(61,Some(internal.metrics.executorRunTime),Some(1188),Some(4170),true,true,None)<==>AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),Some(9),Some(50),true,true,None)
taskMetrics LongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 9)<==>LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 1188)<==>LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 765625000)<==>LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 1527)<==>LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 23)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 303907)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 58, name: Some(duration total (min, med, max)), value: 1172)<==>SQLMetric(id: 54, name: Some(number of output rows), value: 303907)<==>SQLMetric(id: 55, name: Some(number of files), value: 0)<==>SQLMetric(id: 56, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 57, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 1
<<<stageId  1__fgf__rddname MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 59, name: Some(internal.metrics.executorDeserializeTime), value: 50),LongAccumulator(id: 60, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000),LongAccumulator(id: 61, name: Some(internal.metrics.executorRunTime), value: 4170),LongAccumulator(id: 62, name: Some(internal.metrics.executorCpuTime), value: 2234375000),LongAccumulator(id: 63, name: Some(internal.metrics.resultSize), value: 6022),LongAccumulator(id: 64, name: Some(internal.metrics.jvmGCTime), value: 82),LongAccumulator(id: 65, name: Some(internal.metrics.resultSerializationTime), value: 1),LongAccumulator(id: 66, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 67, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 68, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 69, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 70, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 71, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 72, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 73, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 74, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 75, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 76, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 77, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 78, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 79, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 80, name: Some(internal.metrics.input.recordsRead), value: 1000210),LongAccumulator(id: 81, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 82, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu59 -> AccumulableInfo(59,Some(internal.metrics.executorDeserializeTime),None,Some(50),true,true,None),80 -> AccumulableInfo(80,Some(internal.metrics.input.recordsRead),None,Some(1000210),true,true,None),62 -> AccumulableInfo(62,Some(internal.metrics.executorCpuTime),None,Some(2234375000),true,true,None),65 -> AccumulableInfo(65,Some(internal.metrics.resultSerializationTime),None,Some(1),true,true,None),64 -> AccumulableInfo(64,Some(internal.metrics.jvmGCTime),None,Some(82),true,true,None),58 -> AccumulableInfo(58,Some(duration total (min, med, max)),None,Some(4121),true,true,Some(sql)),79 -> AccumulableInfo(79,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),61 -> AccumulableInfo(61,Some(internal.metrics.executorRunTime),None,Some(4170),true,true,None),60 -> AccumulableInfo(60,Some(internal.metrics.executorDeserializeCpuTime),None,Some(15625000),true,true,None),54 -> AccumulableInfo(54,Some(number of output rows),None,Some(1000210),true,true,Some(sql)),63 -> AccumulableInfo(63,Some(internal.metrics.resultSize),None,Some(6022),true,true,None)accu__fgf__si.numTasks 4>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId  1
jobEnd.jobResult  JobSucceeded
====================onJobEnd====================
====================onJobStart====================
jobStart.jobId = 2
stageId  2
<<<stageId  2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId  2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 1>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 2
taskInfo
====================onTaskStart====================
====================onTaskEnd====================
taskend reason Success
stageId sid 2
taskInfo AccumulableInfo(84,Some(number of output rows),Some(22),Some(22),true,true,Some(sql))<==>AccumulableInfo(109,Some(internal.metrics.input.recordsRead),Some(22),Some(22),true,true,None)<==>AccumulableInfo(108,Some(internal.metrics.input.bytesRead),Some(6512496),Some(6512496),true,true,None)<==>AccumulableInfo(92,Some(internal.metrics.resultSize),Some(1310),Some(1310),true,true,None)<==>AccumulableInfo(91,Some(internal.metrics.executorCpuTime),Some(31250000),Some(31250000),true,true,None)<==>AccumulableInfo(90,Some(internal.metrics.executorRunTime),Some(25),Some(25),true,true,None)<==>AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),Some(4),Some(4),true,true,None)
taskMetrics LongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25)<==>LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000)<==>LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496)<==>LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 83, name: Some(duration total (min, med, max)), value: -1)<==>SQLMetric(id: 84, name: Some(number of output rows), value: 22)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 2
<<<stageId  2__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 88, name: Some(internal.metrics.executorDeserializeTime), value: 4),LongAccumulator(id: 89, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 90, name: Some(internal.metrics.executorRunTime), value: 25),LongAccumulator(id: 91, name: Some(internal.metrics.executorCpuTime), value: 31250000),LongAccumulator(id: 92, name: Some(internal.metrics.resultSize), value: 1310),LongAccumulator(id: 93, name: Some(internal.metrics.jvmGCTime), value: 0),LongAccumulator(id: 94, name: Some(internal.metrics.resultSerializationTime), value: 0),LongAccumulator(id: 95, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 96, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 97, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 98, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 99, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 100, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 101, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 102, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 103, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 104, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 105, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 106, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 107, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 108, name: Some(internal.metrics.input.bytesRead), value: 6512496),LongAccumulator(id: 109, name: Some(internal.metrics.input.recordsRead), value: 22),LongAccumulator(id: 110, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 111, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu92 -> AccumulableInfo(92,Some(internal.metrics.resultSize),None,Some(1310),true,true,None),109 -> AccumulableInfo(109,Some(internal.metrics.input.recordsRead),None,Some(22),true,true,None),91 -> AccumulableInfo(91,Some(internal.metrics.executorCpuTime),None,Some(31250000),true,true,None),88 -> AccumulableInfo(88,Some(internal.metrics.executorDeserializeTime),None,Some(4),true,true,None),108 -> AccumulableInfo(108,Some(internal.metrics.input.bytesRead),None,Some(6512496),true,true,None),90 -> AccumulableInfo(90,Some(internal.metrics.executorRunTime),None,Some(25),true,true,None),84 -> AccumulableInfo(84,Some(number of output rows),None,Some(22),true,true,Some(sql))accu__fgf__si.numTasks 1>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId  2
jobEnd.jobResult  JobSucceeded
====================onJobEnd====================
+--------+
|movieids|
+--------+
|    1193|
|     661|
|     914|
|    3408|
|    2355|
|    1197|
|    1287|
|    2804|
|     594|
|     919|
|     595|
|     938|
|    2398|
|    2918|
|    1035|
|    2791|
|    2687|
|    2018|
|    3105|
|    2797|
+--------+
only showing top 20 rows
 
====================onJobStart====================
jobStart.jobId = 3
stageId  3
stageId  4
<<<stageId  3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
<<<stageId  4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 5>>>
====================onJobStart====================
====================onStageSubmitted====================
<<<stageId  3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 4>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 3
taskInfo
====================onTaskStart====================
====================onExecutorMetricsUpdate====================
execId   driver
3    6    AccumulableInfo(112,Some(number of output rows),Some(10394),None,true,true,Some(sql))
3    7    AccumulableInfo(112,Some(number of output rows),Some(7983),None,true,true,Some(sql))
3    8    AccumulableInfo(112,Some(number of output rows),Some(9630),None,true,true,Some(sql))
3    9    AccumulableInfo(112,Some(number of output rows),Some(10079),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 3:>                                                          (0 + 4) / 4]====================onExecutorMetricsUpdate====================
execId   driver
3    6    AccumulableInfo(112,Some(number of output rows),Some(47124),None,true,true,Some(sql))
3    7    AccumulableInfo(112,Some(number of output rows),Some(43589),None,true,true,Some(sql))
3    8    AccumulableInfo(112,Some(number of output rows),Some(48100),None,true,true,Some(sql))
3    9    AccumulableInfo(112,Some(number of output rows),Some(47160),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(103612),Some(103612),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(1617),Some(1616),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(2252658),Some(2252658),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(98373696),Some(98373696),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(103612),Some(103612),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(2298021),Some(2298021),true,true,None)<==>AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(19),Some(19),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1628),Some(1628),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(843750000),Some(843750000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(1718),Some(1718),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(6),Some(6),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 6)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 1718)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 843750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1628)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 19)<==>LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 2298021)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 103612)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 98373696)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 2252658)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 103612)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 1617)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 103612)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
[Stage 3:==============>                                            (1 + 3) / 4]====================onExecutorMetricsUpdate====================
execId   driver
3    6    AccumulableInfo(112,Some(number of output rows),Some(221438),None,true,true,Some(sql))
3    7    AccumulableInfo(112,Some(number of output rows),Some(219572),None,true,true,Some(sql))
3    8    AccumulableInfo(112,Some(number of output rows),Some(225414),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(296402),Some(400014),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2541),Some(4157),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(8806258),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(82364891),Some(180738587),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296402),Some(400014),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6554778),Some(8852799),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(43),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(3213),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1718750000),Some(2562500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2610),Some(4328),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(3),Some(9),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 3)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2610)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1718750000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6554778)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296402)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 82364891)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296402)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2541)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296402)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
[Stage 3:=============================>                             (2 + 2) / 4]====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(296289),Some(696303),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2578),Some(6735),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(15359858),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(91972656),Some(272711243),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(296289),Some(696303),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6576036),Some(15428835),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(67),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(4798),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1750000000),Some(4312500000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2661),Some(6989),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(7),Some(16),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 7)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2661)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1750000000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6576036)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 296289)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 91972656)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 296289)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2578)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 296289)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 3
taskInfo AccumulableInfo(112,Some(number of output rows),Some(303906),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(116,Some(duration total (min, med, max)),Some(2600),Some(9335),true,true,Some(sql))<==>AccumulableInfo(139,Some(internal.metrics.input.recordsRead),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(138,Some(internal.metrics.input.bytesRead),Some(6553600),Some(21913458),true,true,None)<==>AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),Some(80225232),Some(352936475),true,true,None)<==>AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),Some(303906),Some(1000209),true,true,None)<==>AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),Some(6745230),Some(22174065),true,true,None)<==>AccumulableInfo(123,Some(internal.metrics.jvmGCTime),Some(24),Some(91),true,true,None)<==>AccumulableInfo(122,Some(internal.metrics.resultSize),Some(1585),Some(6383),true,true,None)<==>AccumulableInfo(121,Some(internal.metrics.executorCpuTime),Some(1765625000),Some(6078125000),true,true,None)<==>AccumulableInfo(120,Some(internal.metrics.executorRunTime),Some(2674),Some(9663),true,true,None)<==>AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),Some(5),Some(21),true,true,None)
taskMetrics LongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 5)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 2674)<==>LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 1765625000)<==>LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 1585)<==>LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 24)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 6745230)<==>LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 303906)<==>LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 80225232)<==>LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 6553600)<==>LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 303906)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 116, name: Some(duration total (min, med, max)), value: 2600)<==>SQLMetric(id: 112, name: Some(number of output rows), value: 303906)<==>SQLMetric(id: 113, name: Some(number of files), value: 0)<==>SQLMetric(id: 114, name: Some(metadata time (ms)), value: 0)<==>SQLMetric(id: 115, name: Some(scan time total (min, med, max)), value: -1)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 3
<<<stageId  3__fgf__rddname MapPartitionsRDD,FileScanRDD,MapPartitionsRDD,MapPartitionsRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 118, name: Some(internal.metrics.executorDeserializeTime), value: 21),LongAccumulator(id: 119, name: Some(internal.metrics.executorDeserializeCpuTime), value: 0),LongAccumulator(id: 120, name: Some(internal.metrics.executorRunTime), value: 9663),LongAccumulator(id: 121, name: Some(internal.metrics.executorCpuTime), value: 6078125000),LongAccumulator(id: 122, name: Some(internal.metrics.resultSize), value: 6383),LongAccumulator(id: 123, name: Some(internal.metrics.jvmGCTime), value: 91),LongAccumulator(id: 124, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 125, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 126, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 127, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 128, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 129, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 130, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 0),LongAccumulator(id: 131, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 132, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 0),LongAccumulator(id: 133, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 134, name: Some(internal.metrics.shuffle.read.recordsRead), value: 0),LongAccumulator(id: 135, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 22174065),LongAccumulator(id: 136, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 1000209),LongAccumulator(id: 137, name: Some(internal.metrics.shuffle.write.writeTime), value: 352936475),LongAccumulator(id: 138, name: Some(internal.metrics.input.bytesRead), value: 21913458),LongAccumulator(id: 139, name: Some(internal.metrics.input.recordsRead), value: 1000209),LongAccumulator(id: 140, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 141, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu137 -> AccumulableInfo(137,Some(internal.metrics.shuffle.write.writeTime),None,Some(352936475),true,true,None),122 -> AccumulableInfo(122,Some(internal.metrics.resultSize),None,Some(6383),true,true,None),116 -> AccumulableInfo(116,Some(duration total (min, med, max)),None,Some(9335),true,true,Some(sql)),124 -> AccumulableInfo(124,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),118 -> AccumulableInfo(118,Some(internal.metrics.executorDeserializeTime),None,Some(21),true,true,None),136 -> AccumulableInfo(136,Some(internal.metrics.shuffle.write.recordsWritten),None,Some(1000209),true,true,None),139 -> AccumulableInfo(139,Some(internal.metrics.input.recordsRead),None,Some(1000209),true,true,None),121 -> AccumulableInfo(121,Some(internal.metrics.executorCpuTime),None,Some(6078125000),true,true,None),112 -> AccumulableInfo(112,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),120 -> AccumulableInfo(120,Some(internal.metrics.executorRunTime),None,Some(9663),true,true,None),138 -> AccumulableInfo(138,Some(internal.metrics.input.bytesRead),None,Some(21913458),true,true,None),123 -> AccumulableInfo(123,Some(internal.metrics.jvmGCTime),None,Some(91),true,true,None),135 -> AccumulableInfo(135,Some(internal.metrics.shuffle.write.bytesWritten),None,Some(22174065),true,true,None)accu__fgf__si.numTasks 4>>>
====================onStageCompleted====================
====================onStageSubmitted====================
<<<stageId  4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__accuaccu__fgf__si.numTasks 5>>>
====================onStageSubmitted====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
[Stage 4:>                                                          (0 + 4) / 5]====================onExecutorMetricsUpdate====================
execId   driver
4    10    AccumulableInfo(117,Some(number of output rows),Some(8707),None,true,true,Some(sql))
4    11    AccumulableInfo(117,Some(number of output rows),Some(12415),None,true,true,Some(sql))
4    12    AccumulableInfo(117,Some(number of output rows),Some(11606),None,true,true,Some(sql))
4    13    AccumulableInfo(117,Some(number of output rows),Some(9539),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onExecutorMetricsUpdate====================
execId   driver
4    10    AccumulableInfo(117,Some(number of output rows),Some(50731),None,true,true,Some(sql))
4    11    AccumulableInfo(117,Some(number of output rows),Some(50721),None,true,true,Some(sql))
4    12    AccumulableInfo(117,Some(number of output rows),Some(51519),None,true,true,Some(sql))
4    13    AccumulableInfo(117,Some(number of output rows),Some(55871),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
====================onTaskStart====================
stageAttempId 0
stageId 4
taskInfo
====================onTaskStart====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(200041),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(200041),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4436289),Some(4436289),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(4),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),Some(2),Some(2),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(46),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(1686),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1203125000),Some(1203125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2309),Some(2309),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(52),Some(52),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 52)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2309)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1203125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4436289)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200041),Some(400082),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200041),Some(400082),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434760),Some(8871049),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(8),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(92),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(3329),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1421875000),Some(2625000000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2335),Some(4644),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(43),Some(95),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 43)<==>Un-registered Accumulator: LongAccumulator<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2335)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1421875000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434760)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200041)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200041)
====================onTaskEnd====================
[Stage 4:=======================>                                   (2 + 3) / 5]====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200043),Some(600125),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200043),Some(600125),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4434141),Some(13305190),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(12),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(138),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(5015),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1328125000),Some(3953125000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2362),Some(7006),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(15625000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(46),Some(141),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 46)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2362)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1328125000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4434141)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200043)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200043)
====================onTaskEnd====================
====================onTaskEnd====================
taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(800167),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(800167),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4435220),Some(17740410),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(16),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(147,Some(internal.metrics.jvmGCTime),Some(46),Some(184),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1686),Some(6701),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(1343750000),Some(5296875000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(2385),Some(9391),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(31250000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(41),Some(182),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 41)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 2385)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 1343750000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1686)<==>LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 46)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4435220)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)
====================onTaskEnd====================
====================onExecutorMetricsUpdate====================
execId   driver
4    14    AccumulableInfo(117,Some(number of output rows),Some(20678),None,true,true,Some(sql))
====================onExecutorMetricsUpdate====================
[Stage 4:===============================================>           (4 + 1) / 5]====================onTaskEnd====================
                                                                                taskend reason Success
stageId sid 4
taskInfo AccumulableInfo(117,Some(number of output rows),Some(200042),Some(1000209),true,true,Some(sql))<==>AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),Some(200042),Some(1000209),true,true,None)<==>AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),Some(0),Some(0),true,true,None)<==>AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),Some(4433655),Some(22174065),true,true,None)<==>AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),Some(0),Some(0),true,true,None)<==>AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),Some(4),Some(20),true,true,None)<==>AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),Some(0),Some(0),true,true,None)<==>AccumulableInfo(146,Some(internal.metrics.resultSize),Some(1643),Some(8344),true,true,None)<==>AccumulableInfo(145,Some(internal.metrics.executorCpuTime),Some(687500000),Some(5984375000),true,true,None)<==>AccumulableInfo(144,Some(internal.metrics.executorRunTime),Some(715),Some(10106),true,true,None)<==>AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),Some(15625000),Some(46875000),true,true,None)<==>AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),Some(12),Some(194),true,true,None)
taskMetrics LongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 12)<==>LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 15625000)<==>LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 715)<==>LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 687500000)<==>LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 1643)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: CollectionAccumulator<==>LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0)<==>LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 4)<==>LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0)<==>LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 4433655)<==>LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0)<==>LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 200042)<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>Un-registered Accumulator: LongAccumulator<==>SQLMetric(id: 117, name: Some(number of output rows), value: 200042)
====================onTaskEnd====================
====================onStageCompleted====================
stageId 4
<<<stageId  4__fgf__rddname MapPartitionsRDD,xxxxxxxxxxxxxxxxxxxxxxxxx,ShuffledRDD,CoalescedRDD,MapPartitionsRDD rddname__fgf__si.details org.apache.spark.ListenerTest.main(ListenerTest.scala)__fgf__si.name main at <unknown>:0__fgf__taskmetricLongAccumulator(id: 142, name: Some(internal.metrics.executorDeserializeTime), value: 194),LongAccumulator(id: 143, name: Some(internal.metrics.executorDeserializeCpuTime), value: 46875000),LongAccumulator(id: 144, name: Some(internal.metrics.executorRunTime), value: 10106),LongAccumulator(id: 145, name: Some(internal.metrics.executorCpuTime), value: 5984375000),LongAccumulator(id: 146, name: Some(internal.metrics.resultSize), value: 8344),LongAccumulator(id: 147, name: Some(internal.metrics.jvmGCTime), value: 184),LongAccumulator(id: 148, name: Some(internal.metrics.resultSerializationTime), value: 2),LongAccumulator(id: 149, name: Some(internal.metrics.memoryBytesSpilled), value: 0),LongAccumulator(id: 150, name: Some(internal.metrics.diskBytesSpilled), value: 0),LongAccumulator(id: 151, name: Some(internal.metrics.peakExecutionMemory), value: 0),CollectionAccumulator(id: 152, name: Some(internal.metrics.updatedBlockStatuses), value: []),LongAccumulator(id: 153, name: Some(internal.metrics.shuffle.read.remoteBlocksFetched), value: 0),LongAccumulator(id: 154, name: Some(internal.metrics.shuffle.read.localBlocksFetched), value: 20),LongAccumulator(id: 155, name: Some(internal.metrics.shuffle.read.remoteBytesRead), value: 0),LongAccumulator(id: 156, name: Some(internal.metrics.shuffle.read.localBytesRead), value: 22174065),LongAccumulator(id: 157, name: Some(internal.metrics.shuffle.read.fetchWaitTime), value: 0),LongAccumulator(id: 158, name: Some(internal.metrics.shuffle.read.recordsRead), value: 1000209),LongAccumulator(id: 159, name: Some(internal.metrics.shuffle.write.bytesWritten), value: 0),LongAccumulator(id: 160, name: Some(internal.metrics.shuffle.write.recordsWritten), value: 0),LongAccumulator(id: 161, name: Some(internal.metrics.shuffle.write.writeTime), value: 0),LongAccumulator(id: 162, name: Some(internal.metrics.input.bytesRead), value: 0),LongAccumulator(id: 163, name: Some(internal.metrics.input.recordsRead), value: 0),LongAccumulator(id: 164, name: Some(internal.metrics.output.bytesWritten), value: 0),LongAccumulator(id: 165, name: Some(internal.metrics.output.recordsWritten), value: 0)taskmetric__fgf__accu146 -> AccumulableInfo(146,Some(internal.metrics.resultSize),None,Some(8344),true,true,None),155 -> AccumulableInfo(155,Some(internal.metrics.shuffle.read.remoteBytesRead),None,Some(0),true,true,None),158 -> AccumulableInfo(158,Some(internal.metrics.shuffle.read.recordsRead),None,Some(1000209),true,true,None),142 -> AccumulableInfo(142,Some(internal.metrics.executorDeserializeTime),None,Some(194),true,true,None),145 -> AccumulableInfo(145,Some(internal.metrics.executorCpuTime),None,Some(5984375000),true,true,None),154 -> AccumulableInfo(154,Some(internal.metrics.shuffle.read.localBlocksFetched),None,Some(20),true,true,None),148 -> AccumulableInfo(148,Some(internal.metrics.resultSerializationTime),None,Some(2),true,true,None),157 -> AccumulableInfo(157,Some(internal.metrics.shuffle.read.fetchWaitTime),None,Some(0),true,true,None),147 -> AccumulableInfo(147,Some(internal.metrics.jvmGCTime),None,Some(184),true,true,None),156 -> AccumulableInfo(156,Some(internal.metrics.shuffle.read.localBytesRead),None,Some(22174065),true,true,None),144 -> AccumulableInfo(144,Some(internal.metrics.executorRunTime),None,Some(10106),true,true,None),153 -> AccumulableInfo(153,Some(internal.metrics.shuffle.read.remoteBlocksFetched),None,Some(0),true,true,None),117 -> AccumulableInfo(117,Some(number of output rows),None,Some(1000209),true,true,Some(sql)),143 -> AccumulableInfo(143,Some(internal.metrics.executorDeserializeCpuTime),None,Some(46875000),true,true,None)accu__fgf__si.numTasks 5>>>
====================onStageCompleted====================
====================onJobEnd====================
jobEnd.jobId  3
jobEnd.jobResult  JobSucceeded
====================onJobEnd====================
====================onApplicationEnd====================
applicationEnd.time  =  1543335661199
====================onApplicationEnd====================
 
Process finished with exit code 0
 
 

 

以上是关于spark listener的主要内容,如果未能解决你的问题,请参考以下文章

JavaWebListener监听

spring中的IoC

Hadoop的Server及其线程模型分析

Android中三种创建监听器的方法

科普Spark,Spark是什么,如何使用Spark

Spark面试题——Spark资源调优