在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)

Posted

技术标签:

【中文标题】在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)【英文标题】:Spark 2 Job Monitoring with Multiple Simultaneous Jobs on a Spark Context (JobProgressListener) 【发布时间】:2016-12-02 15:41:57 【问题描述】:

在 Spark 2.0.x 上,我一直在使用 JobProgressListener 实现从我们的集群中实时检索 Job/Stage/Task 进度信息。我了解事件流程的工作原理,并成功接收到工作更新。

我的问题是我们在同一个 Spark Context 上同时运行了几个不同的提交,并且似乎无法区分每个提交属于哪个 Job/Stage/Task。每个 Job/Stage/Task 都会收到一个唯一的 id,这很棒。但是,我正在寻找一种方法来提供将与收到的 JobProgressListener 事件对象一起返回的提交“id”或“name”。

我意识到可以在 Spark Context 上设置“作业组”,但是如果多个作业同时在同一个上下文上运行,它们就会变得混乱。

有没有一种方法可以潜入自定义属性,这些属性将与单个 SQLContext 的侦听器事件一起返回?这样做,我应该能够链接后续的 Stage 和 Task 事件并获得我需要的东西。

请注意:我没有为这些工作使用 spark-submit。它们是使用对 SparkSession/SQLContext 的 Java 引用执行的。

感谢您提供任何解决方案或想法。

【问题讨论】:

【参考方案1】:

我正在使用本地属性 - 这可以在 onStageSubmit 事件期间从侦听器访问。之后,我使用相应的 stageId 来识别在该阶段执行的任务。

Future(
      sc.setLocalProperty("job-context", "second")
      val listener = new MetricListener("second")
      sc.addSparkListener(listener)
      //do some spark actions
      val df = spark.read.load("...")
      val countResult = df.filter(....).count()
      println(listener.rows)
      sc.removeSparkListener(listener)
    )

class MetricListener(name:String) extends SparkListener

  var rows: Long = 0L
  var stageId = -1
  
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = 
    if (stageSubmitted.properties.getProperty("job-context") == name)
      stageId = stageSubmitted.stageInfo.stageId
    
  

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = 
    if (taskEnd.stageId == stageId)
      rows = rows + taskEnd.taskMetrics.inputMetrics.recordsRead
  
  

【讨论】:

以上是关于在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)的主要内容,如果未能解决你的问题,请参考以下文章

Spark[四]——Spark并行度

Spark:从具有不同内存/核心配置的单个JVM作业同时启动

每次在纱线中执行批处理作业时都会创建 Spark 上下文

Spark面试题——Spark的内存管理机制

Spark 内存管理

Spark 中用 Scala 和 java 开发有啥区别