如何在 Python 中从 pySpark 添加 SparkListener?

Posted

技术标签:

【中文标题】如何在 Python 中从 pySpark 添加 SparkListener?【英文标题】:How to add a SparkListener from pySpark in Python? 【发布时间】:2017-10-20 08:18:09 【问题描述】:

我想创建一个 Jupyter/IPython 扩展来监控 Apache Spark 作业。

Spark 提供 REST API。

但是,我希望通过回调发送事件更新,而不是轮询服务器。

我正在尝试使用SparkContext.addSparkListener() 注册SparkListener。此功能在 Python 中的 PySpark SparkContext 对象中不可用。那么如何从 Python 注册一个 Python 监听器到 Scala/Java 版本的上下文。是否可以通过py4j 做到这一点?我希望在侦听器中触发事件时调用 python 函数。

【问题讨论】:

【参考方案1】:

这是可能的,虽然它有点复杂。我们可以使用 Py4j callback mechanism 来传递来自 SparkListener 的消息。首先让我们创建一个包含所有必需类的 Scala 包。目录结构:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala 定义了我们稍后要实现的 Python 接口

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener 
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any

Manager.scala 将用于将消息转发给 Python 监听器:

package net.zero323.spark.examples.listener

object Manager 
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = 
    this.synchronized 
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    
  

  def unregister(uuid: String) = 
    this.synchronized 
      listeners = listeners - uuid
    
  

  def notifyAll(message: String): Unit = 
    for  (_, listener) <- listeners  listener.notify(message)
  


终于有一个简单的SparkListener

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.SparkListener, SparkListenerTaskEnd
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener  
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) 
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  

让我们打包我们的扩展:

sbt package

并启动 PySpark 会话,将生成的 jar 添加到类路径并注册侦听器:

 $SPARK_HOME/bin/pyspark \
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

接下来我们要定义一个实现Listener接口的Python对象:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, ".".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

启动回调服务器:

sc._gateway.start_callback_server()

创建和注册监听器:

listener = PythonListener()

注册:

listener.register()

和测试:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
"recordsWritten":33
"recordsWritten":34
"recordsWritten":33

退出时你应该关闭回调服务器:

sc._gateway.shutdown_callback_server()

注意

在使用内部使用回调服务器的 Spark 流时应谨慎使用。

编辑

如果这很麻烦,您可以定义org.apache.spark.scheduler.SparkListenerInterface

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

扩展它:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

直接使用:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

虽然更简单,但这种方法没有选择性(JVM 和 Python 之间的流量更大)需要在 Python 会话中处理 Java 对象。

【讨论】:

只是读到这让我认真考虑切换到 scala ^^ SparkListenerInterface 已添加到最近的 Spark 版本中。根据需要更新以使用您的 Spark 版本。对于 Spark 2.4.6,我必须添加 onBlockManagerAdded 使用结构化流有什么注意事项?这个sc._gateway.start_callback_server() 不应该显式启用吗?

以上是关于如何在 Python 中从 pySpark 添加 SparkListener?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 IBM 的数据科学体验中从 pyspark 访问 postgres 表?

如何将 python 列表添加到 pyspark 列? [复制]

在 Pyspark 中从 Rest Api 创建数据框时出错

如何使用 jdbc pyspark python 在现有表中添加新列?

在 pyspark 中从另一个数据库加载表

有啥方法可以让我在一个 pyspark 脚本中从 10 个不同的模式中提取数据?