Spark监听简记

Posted DataRain

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark监听简记相关的知识,希望对你有一定的参考价值。

Spark提供了任务监听接口来给我们提供了自定义的任务结果监听入口,目前来看这方面的API并不是很友好,使用起来并不是很方便我们去实现自定义功能,但是也好过没有。最近实现了一个spark sql的进度条功能使用到了这些监听API,这里稍微记录一下其基本使用方式,如果需要更为详细的使用方式可以参考Spark History Server的实现,注意下spark.sql.ui这个包下的源码。


首先使用这个监听接口需要继承SparkListener抽象类,或者实现SparkListenerInterface接口,然后在对应的响应方法里面实现自己的业务逻辑,下面是搬运过来的源码:

abstract class SparkListener extends SparkListenerInterface {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { } override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }  override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
  override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }  override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted)  def onExecutorBlacklistedForStage(executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage)    def onNodeBlacklistedForStage(nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage)  override def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted)
  override def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }  override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
  override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)  override def onOtherEvent(event: SparkListenerEvent): Unit = { }}


这个类方法比较多,但是其实我们在上面进行业务相关的扩展使用的话,基本上用到的也就和job、stage、task相关的几个方法,这里也重点记录以下几个常用的:


public void onJobStart(SparkListenerJobStart jobStart)


这个方法挺重要的,例如我可以通过jobStart.jobId()方法来获取我们正在执行的job的ID,这个ID是job的唯一识别码,可以通过这个ID唯一定位到这个job(这里吐槽一下,是定位到这个job,但是没法定位到具体是代码里面的哪个job,或者是哪个spark sql任务里面对应的job,如果需要对应关系的话需要我们做一些骚操作才行)。


骚操作是这样的:首先我们可以通过jobStart.properties()方法来获取这个job的熟悉参数,例如通过properties.getProperty(“spark,jobGroup.id”)来获取这个job的group id。既然可以获取到这个参数,那么我在执行spark sql的时候设置这个参数,就可以通过这个group id关联起spark sql和对应的job啦!下面是设置这个参数的方式:

sparkSession.sqlContext().sparkContext()    .setJobGroup("job group id", "job group descript", false);


而在jobStart中也可以获取到这个job所有的stage ID,用于关联job ID和stage ID,获取即将要执行的stage和task数目,这个信息比较重要,其实Spark History Server中的job任务进度信息就是通过这个获取的。


public void onJobEnd(SparkListenerJobEnd jobEnd)


这个方法可以用来识别这个job是否已经执行完毕,通过jobEnd.jobId()方法可以获取job的ID,这样就知道是哪个任务执行完毕了。这里需要注意的是,执行完毕不等于执行成功,可以通过jobEnd.jobResult()获取的值来判断这个job是否执行成功:

//这个是判断是否失败的:jobResult instanceof JobFailed
//这个是判断是否成功的:JobSucceeded$.MODULE$.canEqual(jobResult)


可能你会好奇为啥这个判断语法不一样?因为这个scala实现时候的坑爹方式,至于为什么要这样做我也想知道啊。。。。。。下面就是JobResult的定义,JobSucceeded是case伴生对象,而JobFailed是一个case class,当然是不一样的。差点收不住我的40米大刀。。。。。。

@DeveloperApisealed trait JobResult
@DeveloperApicase object JobSucceeded extends JobResult
private[spark] case class JobFailed(exception: Exception) extends JobResult


public void onApplicationStart(SparkListenerApplicationStart applicationStart)


public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd)


这两个方法是用于监听spark程序的启动和退出的,具体方法就不细说,一般情况下用处不大,可以在spark程序启动后和关闭后做一些事情,例如记录下spark程序的执行时间等。



public void onStageCompleted(SparkListenerStageCompleted stageCompleted)


在一个stage执行完成后会调用这个方法,可以通过stageCompleted.stageInfo().stageId()方法来获取这个stage的id,这个就可以判断这个stage是位于哪个job下的(为啥要通过这么曲折的方式关联stage和job呢,因为SparkListenerStageCompleted没有提供获取job ID的方法,唉,不想吐槽了)



如上所述,知道了一个job一共有多少个stage、有多少个stage已经执行完毕了,就可以计算出job运行的进度了,当然也可以通过再多一个层级的task来获取,不过不精确的话stage获取也是可以的,类似于history server中显示的job进度,也是通过这种方式获取的:

 

实现spark sql的进度也是如下,spark sql的job取决于后续的执行流程是否只有一次输出,也和spark sql解析引擎有关系,亲测在简单查询的情况下只有一个job,sql和job就一一对应了,这样就可以获取spark sql的执行进度。不过history-server中是没有实现spark sql的进度查询的,为什么呢?大概是因为在spark sql对应多个job时,无法提前获取job的数目因此无法获取其进度,我们自己实现的话那就更费劲了,但是可以对简单spark sql做到进度查询,这已经是涵盖很多使用场景了。这里关联spark sql和对应的job也是通过在jobStart方法的时候进行关联的,因为spark sql并没有提供其他关联方法。Spark sql唯一提供给我们的方法是位于这个方法里面:


public void onOtherEvent(SparkListenerEvent event)


这里的event有几个子类,只要判断是这几个子类中的其中一个,就可以知道这个spark sql是否开始执行或者已经执行完毕,这是大概的使用示例:

if (SparkListenerSQLExecutionStart.class.isInstance(event)) { SparkListenerSQLExecutionStart start = (SparkListenerSQLExecutionStart) event; long startTime = start.time(); long executeId = start.executionId(); InfoLogger.info("sql_start." + "executeid:" + executeId + " .starttime:" + startTime);}if (SparkListenerSQLExecutionEnd.class.isInstance(event)) { SparkListenerSQLExecutionEnd end = (SparkListenerSQLExecutionEnd) event; long endTime = end.time(); long executeId = end.executionId(); InfoLogger.info("sql_end." + "executeid:" + executeId + " .endtime:" + endTime);}if (SparkListenerDriverAccumUpdates.class.isInstance(event))){ SparkListenerDriverAccumUpdates updates = (SparkListenerDriverAccumUpdates) event; String info = updates.accumUpdates().mkString(); InfoLogger.info("sql_update." + info);}


其实那几个子类是这样子定义的,大概写了怎么去用的样子,具体也可以参考spark history server的源码实现,可以找到很多有用的使用方式:

@DeveloperApicase class SparkListenerSQLExecutionStart(    executionId: Long,    description: String,    details: String,    physicalPlanDescription: String,    sparkPlanInfo: SparkPlanInfo,    time: Long) extends SparkListenerEvent
@DeveloperApicase class SparkListenerSQLExecutionEnd(executionId: Long,      time: Long)extends SparkListenerEvent/** * A message used to update SQL metric value for driver-side updates (which doesn't get reflected * automatically). * * @param executionId The execution id for a query, so we can find the query plan. * @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints). */@DeveloperApicase class SparkListenerDriverAccumUpdates(    executionId: Long,    @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter])    accumUpdates: Seq[(LongLong)]) extends SparkListenerEvent


这些在spark调度上使用上还是可以的,起码是提供了这些东西了,好不好用暂且不说,反正是有了,能用就行能用就行~~

以上是关于Spark监听简记的主要内容,如果未能解决你的问题,请参考以下文章

Spark 学习总结

在这个 spark 代码片段中 ordering.by 是啥意思?

python+spark程序代码片段

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

12C GDS安装简记

Spark 学习总结