Spark监听简记
Posted DataRain
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark监听简记相关的知识,希望对你有一定的参考价值。
Spark提供了任务监听接口来给我们提供了自定义的任务结果监听入口,目前来看这方面的API并不是很友好,使用起来并不是很方便我们去实现自定义功能,但是也好过没有。最近实现了一个spark sql的进度条功能使用到了这些监听API,这里稍微记录一下其基本使用方式,如果需要更为详细的使用方式可以参考Spark History Server的实现,注意下spark.sql.ui这个包下的源码。
首先使用这个监听接口需要继承SparkListener抽象类,或者实现SparkListenerInterface接口,然后在对应的响应方法里面实现自己的业务逻辑,下面是搬运过来的源码:
abstract class SparkListener extends SparkListenerInterface {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
override def onExecutorBlacklisted(executorBlacklisted: SparkListenerExecutorBlacklisted)
def onExecutorBlacklistedForStage(executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage)
def onNodeBlacklistedForStage(nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage)
override def onExecutorUnblacklisted(executorUnblacklisted: SparkListenerExecutorUnblacklisted)
override def onNodeBlacklisted(nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }
override def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted)
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
这个类方法比较多,但是其实我们在上面进行业务相关的扩展使用的话,基本上用到的也就和job、stage、task相关的几个方法,这里也重点记录以下几个常用的:
public void onJobStart(SparkListenerJobStart jobStart)
这个方法挺重要的,例如我可以通过jobStart.jobId()方法来获取我们正在执行的job的ID,这个ID是job的唯一识别码,可以通过这个ID唯一定位到这个job(这里吐槽一下,是定位到这个job,但是没法定位到具体是代码里面的哪个job,或者是哪个spark sql任务里面对应的job,如果需要对应关系的话需要我们做一些骚操作才行)。
骚操作是这样的:首先我们可以通过jobStart.properties()方法来获取这个job的熟悉参数,例如通过properties.getProperty(“spark,jobGroup.id”)来获取这个job的group id。既然可以获取到这个参数,那么我在执行spark sql的时候设置这个参数,就可以通过这个group id关联起spark sql和对应的job啦!下面是设置这个参数的方式:
sparkSession.sqlContext().sparkContext()
.setJobGroup("job group id", "job group descript", false);
而在jobStart中也可以获取到这个job所有的stage ID,用于关联job ID和stage ID,获取即将要执行的stage和task数目,这个信息比较重要,其实Spark History Server中的job任务进度信息就是通过这个获取的。
public void onJobEnd(SparkListenerJobEnd jobEnd)
这个方法可以用来识别这个job是否已经执行完毕,通过jobEnd.jobId()方法可以获取job的ID,这样就知道是哪个任务执行完毕了。这里需要注意的是,执行完毕不等于执行成功,可以通过jobEnd.jobResult()获取的值来判断这个job是否执行成功:
//这个是判断是否失败的:
jobResult instanceof JobFailed
//这个是判断是否成功的:
JobSucceeded$.MODULE$.canEqual(jobResult)
可能你会好奇为啥这个判断语法不一样?因为这个scala实现时候的坑爹方式,至于为什么要这样做我也想知道啊。。。。。。下面就是JobResult的定义,JobSucceeded是case伴生对象,而JobFailed是一个case class,当然是不一样的。差点收不住我的40米大刀。。。。。。
@DeveloperApi
sealed trait JobResult
@DeveloperApi
case object JobSucceeded extends JobResult
private[spark] case class JobFailed(exception: Exception) extends JobResult
public void onApplicationStart(SparkListenerApplicationStart applicationStart)
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd)
这两个方法是用于监听spark程序的启动和退出的,具体方法就不细说,一般情况下用处不大,可以在spark程序启动后和关闭后做一些事情,例如记录下spark程序的执行时间等。
public void onStageCompleted(SparkListenerStageCompleted stageCompleted)
在一个stage执行完成后会调用这个方法,可以通过stageCompleted.stageInfo().stageId()方法来获取这个stage的id,这个就可以判断这个stage是位于哪个job下的(为啥要通过这么曲折的方式关联stage和job呢,因为SparkListenerStageCompleted没有提供获取job ID的方法,唉,不想吐槽了)
如上所述,知道了一个job一共有多少个stage、有多少个stage已经执行完毕了,就可以计算出job运行的进度了,当然也可以通过再多一个层级的task来获取,不过不精确的话stage获取也是可以的,类似于history server中显示的job进度,也是通过这种方式获取的:
实现spark sql的进度也是如下,spark sql的job取决于后续的执行流程是否只有一次输出,也和spark sql解析引擎有关系,亲测在简单查询的情况下只有一个job,sql和job就一一对应了,这样就可以获取spark sql的执行进度。不过history-server中是没有实现spark sql的进度查询的,为什么呢?大概是因为在spark sql对应多个job时,无法提前获取job的数目因此无法获取其进度,我们自己实现的话那就更费劲了,但是可以对简单spark sql做到进度查询,这已经是涵盖很多使用场景了。这里关联spark sql和对应的job也是通过在jobStart方法的时候进行关联的,因为spark sql并没有提供其他关联方法。Spark sql唯一提供给我们的方法是位于这个方法里面:
public void onOtherEvent(SparkListenerEvent event)
这里的event有几个子类,只要判断是这几个子类中的其中一个,就可以知道这个spark sql是否开始执行或者已经执行完毕,这是大概的使用示例:
if (SparkListenerSQLExecutionStart.class.isInstance(event)) {
SparkListenerSQLExecutionStart start = (SparkListenerSQLExecutionStart) event;
long startTime = start.time();
long executeId = start.executionId();
InfoLogger.info("sql_start." + "executeid:" + executeId + " .starttime:" + startTime);
}
if (SparkListenerSQLExecutionEnd.class.isInstance(event)) {
SparkListenerSQLExecutionEnd end = (SparkListenerSQLExecutionEnd) event;
long endTime = end.time();
long executeId = end.executionId();
InfoLogger.info("sql_end." + "executeid:" + executeId + " .endtime:" + endTime);
}
if (SparkListenerDriverAccumUpdates.class.isInstance(event))){
SparkListenerDriverAccumUpdates updates = (SparkListenerDriverAccumUpdates) event;
String info = updates.accumUpdates().mkString();
InfoLogger.info("sql_update." + info);
}
其实那几个子类是这样子定义的,大概写了怎么去用的样子,具体也可以参考spark history server的源码实现,可以找到很多有用的使用方式:
case class SparkListenerSQLExecutionStart(
executionId: Long,
description: String,
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long) extends SparkListenerEvent
case class SparkListenerSQLExecutionEnd(executionId: Long,
time: Long)extends SparkListenerEvent
/**
* A message used to update SQL metric value for driver-side updates (which doesn't get reflected
* automatically).
*
* @param executionId The execution id for a query, so we can find the query plan.
* @param accumUpdates Map from accumulator id to the metric value (metrics are always 64-bit ints).
*/
case class SparkListenerDriverAccumUpdates(
executionId: Long,
accumUpdates: Seq[(Long, Long)]) extends SparkListenerEvent
这些在spark调度上使用上还是可以的,起码是提供了这些东西了,好不好用暂且不说,反正是有了,能用就行能用就行~~
以上是关于Spark监听简记的主要内容,如果未能解决你的问题,请参考以下文章
在这个 spark 代码片段中 ordering.by 是啥意思?
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段