在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)
Posted
技术标签:
【中文标题】在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)【英文标题】:Spark 2 Job Monitoring with Multiple Simultaneous Jobs on a Spark Context (JobProgressListener) 【发布时间】:2016-12-02 15:41:57 【问题描述】:在 Spark 2.0.x 上,我一直在使用 JobProgressListener 实现从我们的集群中实时检索 Job/Stage/Task 进度信息。我了解事件流程的工作原理,并成功接收到工作更新。
我的问题是我们在同一个 Spark Context 上同时运行了几个不同的提交,并且似乎无法区分每个提交属于哪个 Job/Stage/Task。每个 Job/Stage/Task 都会收到一个唯一的 id,这很棒。但是,我正在寻找一种方法来提供将与收到的 JobProgressListener 事件对象一起返回的提交“id”或“name”。
我意识到可以在 Spark Context 上设置“作业组”,但是如果多个作业同时在同一个上下文上运行,它们就会变得混乱。
有没有一种方法可以潜入自定义属性,这些属性将与单个 SQLContext 的侦听器事件一起返回?这样做,我应该能够链接后续的 Stage 和 Task 事件并获得我需要的东西。
请注意:我没有为这些工作使用 spark-submit。它们是使用对 SparkSession/SQLContext 的 Java 引用执行的。
感谢您提供任何解决方案或想法。
【问题讨论】:
【参考方案1】:我正在使用本地属性 - 这可以在 onStageSubmit 事件期间从侦听器访问。之后,我使用相应的 stageId 来识别在该阶段执行的任务。
Future(
sc.setLocalProperty("job-context", "second")
val listener = new MetricListener("second")
sc.addSparkListener(listener)
//do some spark actions
val df = spark.read.load("...")
val countResult = df.filter(....).count()
println(listener.rows)
sc.removeSparkListener(listener)
)
class MetricListener(name:String) extends SparkListener
var rows: Long = 0L
var stageId = -1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit =
if (stageSubmitted.properties.getProperty("job-context") == name)
stageId = stageSubmitted.stageInfo.stageId
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit =
if (taskEnd.stageId == stageId)
rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
【讨论】:
以上是关于在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)的主要内容,如果未能解决你的问题,请参考以下文章