Spark中的事件循环处理器EventLoop

Posted javartisan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中的事件循环处理器EventLoop相关的知识,希望对你有一定的参考价值。

Spark中的事件循环处理器EventLoop实现原理并不难,虽然自己实现一个大概也不会差太多,但是感觉自己的实现在拓展性方便肯定不如Spark的EventLoop好,所以写一篇博客记录一下。


事件循环处理器,顾名思义就是为了处理事件。在Spark中有DAGSchedulerEventProcessLoop事件处理器和JobGenerator的内部匿名实现的事件处理器。该事件循环处理器的实现原理比较简单,底层使用一个线程作为事件理线程,使用一个LinkedBlockingDeque作为事件缓冲区,暂存储接收到的事件待事件处理线程处理。线程对事件的处理是通过一个while死循环,一直监视消息队列,每当消息队里有消息时候便获取并处理。如果消息队列为空的话,则take阻塞到此处。代码实现如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.util

import java.util.concurrent.BlockingQueue, LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicBoolean

import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

/**
  * An event loop to receive events from the caller and process all events in the event thread. It
  * will start an exclusive event thread to process all events.
  *
  * Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
  * handle events in time to avoid the potential OOM.
  * <br><br>
  * 内部一个线程负责处理消息
  *
  */
private[spark] abstract class EventLoop[E](name: String) extends Logging 

  /**eventQueue是一个事件队列*/
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
  /**事件循环处理器的启动或结束标记*/
  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) 
    setDaemon(true)

    override def run(): Unit = 
      try 
        while (!stopped.get) 
          val event = eventQueue.take()
          try 
            //TODO 一旦有事件,则调用onReceive进行消息处理
            onReceive(event)
           catch 
            case NonFatal(e) =>
              try 
                onError(e)
               catch 
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              
          
        
       catch 
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      
    

  

  def start(): Unit = 
    if (stopped.get) 
      throw new IllegalStateException(name + " has already been stopped")
    
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  

  def stop(): Unit = 
    if (stopped.compareAndSet(false, true)) 
      eventThread.interrupt()
      var onStopCalled = false
      try 
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
       catch 
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) 
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          
      
     else 
      // Keep quiet to allow calling `stop` multiple times.
    
  

  /**
    * Put the event into the event queue. The event thread will process it later.
    */
  def post(event: E): Unit = 
    eventQueue.put(event)
  

  /**
    * Return if the event thread has already been started but not yet stopped.
    */
  def isActive: Boolean = eventThread.isAlive

  /**
    * Invoked when `start()` is called but before the event thread starts.
    * 生命周期方法
    */
  protected def onStart(): Unit = 

  /**
    * Invoked when `stop()` is called and the event thread exits.
    * 生命周期方法
    */
  protected def onStop(): Unit = 

  /**
    * Invoked in the event thread when polling events from the event queue.
    *
    * Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
    * and cannot process events in time. If you want to call some blocking actions, run them in
    * another thread.
    * <br> 由于不同业务的处理方式不同,所以定义为抽象方法,有具体子类根据具体业务实现<br><br>
    * 生命周期方法
    */
  protected def onReceive(event: E): Unit

  /**
    * Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
    * will be ignored.
    *
    *  <br> 由于不同业务的处理方式不同,所以定义为抽象方法,有具体子类根据具体业务实现
    */
  protected def onError(e: Throwable): Unit



实现中,将共同的业务实现在EventLoop中实现,而不同的业务方法定义为抽象方法,这样方便在以后具体业务实现中实现具体的业务!






以上是关于Spark中的事件循环处理器EventLoop的主要内容,如果未能解决你的问题,请参考以下文章

前端中的事件循环eventloop机制

Qt 如何在同一个 EventLoop 中处理事件和信号

详解队列在前端的应用,深剖JS中的事件循环Eventloop,再了解微任务和宏任务

构建一个基于消息驱动的Eventloop线程模型

同步异步和Event loop事件循环

浏览器和Node不同的事件循环(Event Loop)