Spark 的 Listener LisenerBus源码分析

Posted chouc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 的 Listener LisenerBus源码分析相关的知识,希望对你有一定的参考价值。

ListenerBus

Spark 很多地方需要对一些事件进行监听或处理,这就涉及到到了 Listener。

比如:当一个Batch完成的时候,需要做什么。当stream 启动时候时候需要做什么等。再具体的例子,就是我想看每个batch 里面的数据量是多少

对于不同场景有不同的 Listener

比如 StreamingListener、SparkListener、StreamingQueryListener 等,对于Listener 也就对应不同的 ListenerBus,比如StreamingListenerBus、SparkListenerBus、StreamingQueryListenerBus。提到Bus 应该一种是对于多个Listener 。

ListenerBus 是运行在Driver 端,消息发送是发生在Driver 或者 Executor 中

我这以 StreamingListener 为例子

定义一个我们想要的 DStreamListener 具体的需求是根据不事件来进行吃处理。比如每个batch 的数据量、batch 从哪个offset 到哪个offset 等

Demo

class DStreamListener extends StreamingListener{
  /** Called when the streaming has been started */
  override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {
  }


  /** Called when a receiver has been started */
  override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
    receiverStarted.receiverInfo.streamId
  }
  
  /** Called when processing of a batch of jobs has started.  */
  override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
    // 一个 batch 的数据量
    println(s"batchStarted numRecords ${batchStarted.batchInfo.numRecords}")
  }
  
  .....  更多事件在 StreamingListener 里面可以看到
}

然后就是在StreamContext中加入这个Listener

object KafkaDirectDstream {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("KafkaDirectDstream")
    sparkConf.setMaster("local[*]")
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val streamingContext = new StreamingContext(sparkConf, Seconds(2))
    streamingContext.sparkContext.setLogLevel("ERROR")
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "s1:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "p1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("test_mxb")
    val dstream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    dstream.map(record => (record.key, record.value, record.partition(), record.offset()))
      .foreachRDD(rdd => {
        .....
      })
      
    // 加入监听器
    streamingContext.addStreamingListener(new DStreamListener)
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

StreamingContext

// 向 StreamingListenerBus 放入 Listener

def addStreamingListener(streamingListener: StreamingListener) {
  scheduler.listenerBus.addListener(streamingListener)
}

ListenerBus

底层实际是放入CopyOnWriteArrayList 中

CopyOnWriteArrayList 是一个线程安全的List

private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

// 放入list中
final def addListener(listener: L): Unit = {
  listenersPlusTimers.add((listener, getTimer(listener)))
}
    
/**
 * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
 * This method is intended to be overridden by subclasses.
 */
protected def getTimer(listener: L): Option[Timer] = None

StreamingListenerBus

当 StreamingListenerBus 启动后,就会将自己注册到 LiveListenerBus 中,sparkListenerBus 就是 LiveListenerBus 。 StreamingListenerBus 继承了 SparkListener ,StreamingListenerBus 也是 LiveListenerBus 的一种 Listener

def start(): Unit = {
  // sparkListenerBus 是 LiveListenerBus  
  sparkListenerBus.addToStatusQueue(this)
}

 LiveListenerBus

// 向 LiveListenerBus注册时,会将ListenerBus 的 放到 一类name相同的 AsyncEventQueue队列中,再把AsyncEventQueue 放入 CopyOnWriteArrayList 中。所以在本案例中 AsyncEventQueue 里是 StreamingListenerBus

private[spark] def addToQueue(
    listener: SparkListenerInterface,
    queue: String): Unit = synchronized {
  if (stopped.get()) {
    throw new IllegalStateException("LiveListenerBus is stopped.")
  }

  queues.asScala.find(_.name == queue) match {
    case Some(queue) =>
      queue.addListener(listener)

    case None =>
      val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
      // 这个方法还是 ListenerBus 
      newQueue.addListener(listener)
      if (started.get()) {
        newQueue.start(sparkContext)
      }
      queues.add(newQueue)
  }
}

ListenerBus

AsyncEventQueue 也是继承了 ListenerBus 这个方法 listener 注册到StreamContext,再到StreamListenerBus中也是这个方法。可以看上面

final def addListener(listener: L): Unit = {
  listenersPlusTimers.add((listener, getTimer(listener)))
}

到此就结束了 listener 注册。

JobScheduler

开始产生事件,当有StreamJob提交时候

private def handleJobStart(job: Job, startTime: Long) {
  val jobSet = jobSets.get(job.time)
  val isFirstJobOfJobSet = !jobSet.hasStarted
  jobSet.handleJobStart(job)
  if (isFirstJobOfJobSet) {
    // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
    // correct "jobSet.processingStartTime".
    // 这边就发送 BatchStarted 的消息,并吧batch信息发送过去
    listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
  }
  job.setStartTime(startTime)
  listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))
  logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}

StreamingListenerBus

// 这边会对event 进行封装一层

def post(event: StreamingListenerEvent) {
  sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
}

LiveListenerBus

/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
  if (stopped.get()) {
    return
  }

  metrics.numEventsPosted.inc()

  // If the event buffer is null, it means the bus has been started and we can avoid
  // synchronization and post events directly to the queues. This should be the most
  // common case during the life of the bus.
  // 当 bus 启动后 queueEvents就为null,通常就会进入这个方法
  if (queuedEvents == null) {
    postToQueues(event)
    return
  }

  // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
  // calling start() picks up the new event.
  synchronized {
    if (!started.get()) {
      queuedEvents += event
      return
    }
  }

  // If the bus was already started when the check above was made, just post directly to the
  // queues.
  postToQueues(event)
}

AsyncEventQueue

然后就是调用这个方法,queues 就是一类 ListenerBus的 AsyncEventQueue

private def postToQueues(event: SparkListenerEvent): Unit = {
  val it = queues.iterator()
  while (it.hasNext()) {
    // 遍历所有的 AsyncEventQueue 发送 event
    it.next().post(event)
  }
}

AsyncEventQueue

先放到一个队列中 (生产者 消费者模型)

def post(event: SparkListenerEvent): Unit = {
  if (stopped.get()) {
    return
  }

  eventCount.incrementAndGet()
  // 放入里ListenerBus 队列
  if (eventQueue.offer(event)) {
    return
  }

  eventCount.decrementAndGet()
  droppedEvents.inc()
  droppedEventsCounter.incrementAndGet()
  if (logDroppedEvent.compareAndSet(false, true)) {
    // Only log the following message once to avoid duplicated annoying logs.
    logError(s"Dropping event from queue $name. " +
      "This likely means one of the listeners is too slow and cannot keep up with " +
      "the rate at which tasks are being started by the scheduler.")
  }
  logTrace(s"Dropping event $event")

  val droppedCount = droppedEventsCounter.get
  if (droppedCount > 0) {
    // Don't log too frequently
    if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
      // There may be multiple threads trying to decrease droppedEventsCounter.
      // Use "compareAndSet" to make sure only one thread can win.
      // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
      // then that thread will update it.
      if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
        val prevLastReportTimestamp = lastReportTimestamp
        lastReportTimestamp = System.currentTimeMillis()
        val previous = new java.util.Date(prevLastReportTimestamp)
        logWarning(s"Dropped $droppedCount events from $name since $previous.")
      }
    }
  }
}

消费 当AsyncEventQueue 时就会启动一个线程去调用 dispatch

private val dispatchThread = new Thread(s"spark-listener-group-$name") {
  setDaemon(true)
  override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
    dispatch()
  }
}
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
  var next: SparkListenerEvent = eventQueue.take()
  // 循环
  while (next != POISON_PILL) {
    val ctx = processingTime.time()
    try {
      // 因为 AsyncEventQueue 继承了SparkListenerBus,SparkListenerBus继承了ListenerBus  ,回到 ListenerBus.postToAll
      super.postToAll(next)
    } finally {
      ctx.stop()
    }
    eventCount.decrementAndGet()
    next = eventQueue.take()
  }
  eventCount.decrementAndGet()
}

ListenerBus

postToAll 会遍历 listener,发送消息

def postToAll(event: E): Unit = {
  // JavaConverters can create a JIterableWrapper if we use asScala.
  // However, this method will be called frequently. To avoid the wrapper cost, here we use
  // Java Iterator directly.
  // listenersPlusTimers 里面装得是 StreamListenerBus
  val iter = listenersPlusTimers.iterator
  while (iter.hasNext) {
    val listenerAndMaybeTimer = iter.next()
    val listener = listenerAndMaybeTimer._1
    val maybeTimer = listenerAndMaybeTimer._2
    val maybeTimerContext = if (maybeTimer.isDefined) {
      maybeTimer.get.time()
    } else {
      null
    }
    try {
      // 关键 listener 是 StreamListenerBus
      doPostEvent(listener, event)
      if (Thread.interrupted()) {
        // We want to throw the InterruptedException right away so we can associate the interrupt
        // with this listener, as opposed to waiting for a queue.take() etc. to detect it.
        throw new InterruptedException()
      }
    } catch {
      case ie: InterruptedException =>
        logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +
          s"Removing that listener.", ie)
        removeListenerOnError(listener)
      case NonFatal(e) if !isIgnorableException(e) =>
        logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    } finally {
      if (maybeTimerContext != null) {
        maybeTimerContext.stop()
      }
    }
  }
}

SparkListenerBus

AsyncEventQueue 继承了 SparkListenerBus,这边调用是 SparkListenerBus doPostEvent 的方法。

// 这边是最关键的 listener 就是 StreamingListenerBus
// 我们最开发送的对象的外层是WrappedStreamingListenerEvent 这边都匹配不上,只能是default
// 也就是调用 StreamingListenerBus.onOtherEvent
protected override def doPostEvent(
    listener: SparkListenerInterface,
    event: SparkListenerEvent): Unit = {
  event match {
    case stageSubmitted: SparkListenerStageSubmitted =>
      listener.onStageSubmitted(stageSubmitted)
    case stageCompleted: SparkListenerStageCompleted =>
      listener.onStageCompleted(stageCompleted)
    case jobStart: SparkListenerJobStart =>
      listener.onJobStart(jobStart)
    case jobEnd: SparkListenerJobEnd =>
      listener.onJobEnd(jobEnd)
    case taskStart: SparkListenerTaskStart =>
      listener.onTaskStart(taskStart)
    case taskGettingResult: SparkListenerTaskGettingResult =>
      listener.onTaskGettingResult(taskGettingResult)
    case taskEnd: SparkListenerTaskEnd =>
      listener.onTaskEnd(taskEnd)
    case environmentUpdate: SparkListenerEnvironmentUpdate =>
      listener.onEnvironmentUpdate(environmentUpdate)
    case blockManagerAdded: SparkListenerBlockManagerAdded =>
      listener.onBlockManagerAdded(blockManagerAdded)
    case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
      listener.onBlockManagerRemoved(blockManagerRemoved)
    case unpersistRDD: SparkListenerUnpersistRDD =>
      listener.onUnpersistRDD(unpersistRDD)
    case applicationStart: SparkListenerApplicationStart =>
      listener.onApplicationStart(applicationStart)
    case applicationEnd: SparkListenerApplicationEnd =>
      listener.onApplicationEnd(applicationEnd)
    case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
      listener.onExecutorMetricsUpdate(metricsUpdate)
    case executorAdded: SparkListenerExecutorAdded =>
      listener.onExecutorAdded(executorAdded)
    case executorRemoved: SparkListenerExecutorRemoved =>
      listener.onExecutorRemoved(executorRemoved)
    case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
      listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
    case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
      listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
    case executorBlacklisted: SparkListenerExecutorBlacklisted =>
      listener.onExecutorBlacklisted(executorBlacklisted)
    case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
      listener.onExecutorUnblacklisted(executorUnblacklisted)
    case nodeBlacklisted: SparkListenerNodeBlacklisted =>
      listener.onNodeBlacklisted(nodeBlacklisted)
    case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
      listener.onNodeUnblacklisted(nodeUnblacklisted)
    case blockUpdated: SparkListenerBlockUpdated =>
      listener.onBlockUpdated(blockUpdated)
    case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
      listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
    // 关键
    case _ => listener.onOtherEvent(event)
  }
}

StreamingListenerBus

override def onOtherEvent(event: SparkListenerEvent): Unit = {
  event match {
    case WrappedStreamingListenerEvent(e) =>
      //然后 还是回到 ListenerBus 
      postToAll(e)
    case _ =>
  }
}

ListenerBus

这边就是遍历用户自己的 listener

def postToAll(event: E): Unit = {
  // JavaConverters can create a JIterableWrapper if we use asScala.
  // However, this method will be called frequently. To avoid the wrapper cost, here we use
  // Java Iterator directly.
  // listenersPlusTimers 里面装得是 StreamListenerBus
  val iter = listenersPlusTimers.iterator
  while (iter.hasNext) {
    val listenerAndMaybeTimer = iter.next()
    val listener = listenerAndMaybeTimer._1
    val maybeTimer = listenerAndMaybeTimer._2
    val maybeTimerContext = if (maybeTimer.isDefined) {
      maybeTimer.get.time()
    } else {
      null
    }
    try {
      // 关键 listener 是 用户自己的Listener 。当前是在StreamingListenerBus对象中,
      // StreamingListenerBus 是重写了 doPostEvent方法
      doPostEvent(listener, event)
      if (Thread.interrupted()) {
        // We want to throw the InterruptedException right away so we can associate the interrupt
        // with this listener, as opposed to waiting for a queue.take() etc. to detect it.
        throw new InterruptedException()
      }
    } catch {
      case ie: InterruptedException =>
        logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}.  " +
          s"Removing that listener.", ie)
        removeListenerOnError(listener)
      case NonFatal(e) if !isIgnorableException(e) =>
        logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
    } finally {
      if (maybeTimerContext != null) {
        maybeTimerContext.stop()
      }
    }
  }
}

StreamingListenerBus

对匹配的message ,传入listener 中

protected override def doPostEvent(
    listener: StreamingListener,
    event: StreamingListenerEvent): Unit = {
  event match {
    case receiverStarted: StreamingListenerReceiverStarted =>
      listener.onReceiverStarted(receiverStarted)
    case receiverError: StreamingListenerReceiverError =>
      listener.onReceiverError(receiverError)
    case receiverStopped: StreamingListenerReceiverStopped =>
      listener.onReceiverStopped(receiverStopped)
    case batchSubmitted: StreamingListenerBatchSubmitted =>
      listener.onBatchSubmitted(batchSubmitted)
    case batchStarted: StreamingListenerBatchStarted =>
      listener.onBatchStarted(batchStarted)
    case batchCompleted: StreamingListenerBatchCompleted =>
      listener.onBatchCompleted(batchCompleted)
    case outputOperationStarted: StreamingListenerOutputOperationStarted =>
      listener.onOutputOperationStarted(outputOperationStarted)
    case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
      listener.onOutputOperationCompleted(outputOperationCompleted)
    case streamingStarted: StreamingListenerStreamingStarted =>
      listener.onStreamingStarted(streamingStarted)
    case _ =>
  }
}

总结就是 普通的Listener -> StreamListenerBus,StreamListenerBus -> LiveListenerBug -> AyncEventQueue

以上是关于Spark 的 Listener LisenerBus源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Spark ListenerBus 和 MetricsSystem 体系分析(引用)

如何以编程方式获取 Spark UI 信息

spark自定义UDF为啥参数最多21个

使用篇SpringBoot整合Listener

listener监听器的使用

第7章 监听器Listener