第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

Posted michaelli916

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考相关的知识,希望对你有一定的参考价值。

1。数据接收架构设计模式
2。数据接收源码彻底研究

sparkstreaming接收数据的特征:
第一个特征:不断地持续地接收数据;
第二个特征:一般receiver和driver不再同一个进程中,接受后不断地汇报给driver,driver根据接受到的数据的元数据来负责调度

架构上看,有个循环器不断地接收数据,存储数据,汇报给driver,其中接收数据和存储数据不是同一个对象负责。receiver接收器有点类似mvc架构中的M,receiverSupervisor控制器(receiver由receiverSupervisor启动,接受到数据后交给receiverSupervisor存储,所以说receiverSupervisor是控制器。)有点类似mvc架构中的c,driver有点类似mvc架构中的v,展现/消费数据。
元数据相当于指针,只不过需要有一套自己的解析规则。

receiverTracker通过发送一个又一个job,每个job有1个task,每个task里面只有1个receiversupervisor函数功能来启动receiver.

receiverTracker启动的时候启动了ReceiverTrackerEndpoint,用来在工作的过程中汇报状态。

  // endpoint is created when generator starts.
  // This not being null means the tracker has been started and not stopped
  private var endpoint: RpcEndpointRef = null
  /** Start the endpoint and receiver execution thread. */
  def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }
  /**
   * Get the receivers from the ReceiverInputDStreams, distributes them to the
   * worker nodes as a parallel collection, and runs them.
   */
  private def launchReceivers(): Unit = {
  //根据输入来源创建的InputDStream的receiver,是driver级别的,相当于一个引用句柄,只是元数据/引用的描述,是可序列化的,需要发送到executor上
    val receivers = receiverInputStreams.map(nis => {
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    })

    //进行负载均衡,确认worker还活着
    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
  //发送消息  endpoint.send(StartAllReceivers(receivers))
  }

收到消息后的处理:

      case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        //循环,每次处理一个receiver
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

关键方法startReceiver(receiver, executors):

/**
     * Start a receiver along with its scheduled executors
     */
    private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      //该封装了的函数用来start receiver
      //RDD的function必须返回iterator
      //这里用到了迭代器,只不过迭代器的集合里只有一个receiver
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            //新建控制器
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      //新建RDD,该RDD是专门为了创建receiver而创建的RDD,同普通RDD一样,需要考虑本地性
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
        //只创建一个receiver
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//提交作业来start receiver,在具体节点上以task的方式来执行RDD
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(submitJobThreadPool)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

控制器ReceiverSupervisorImpl:负责处理receiver接受到的数据,存储他们并汇报给driver
receiver接受数据是循环一条一条接受的,kafka,socket,KV格式。

/**
 * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
 * which provides all the necessary functionality for handling the data received by
 * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
 * object that is used to divide the received data stream into blocks of data.
 */
private[streaming] class ReceiverSupervisorImpl(
    receiver: Receiver[_],
    env: SparkEnv,
    hadoopConf: Configuration,
    checkpointDirOption: Option[String]
  ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

ReceiverSupervisorImpl是在ReceiverTracker的startReceiver()方法(ReceiverTracker的start()方法导致了该方法的调用)中创建的。查看它的构造器可知,ReceiverSupervisorImpl创建时会拿到ReceiverTracker的Endpoint,

  /** Remote RpcEndpointRef for the ReceiverTracker */
  //该方法最终拿到的是ReceiverTracker的Endpoint
  private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
//新建一个消息通讯体,该通讯体在ReceiverSupervisorImpl中,即在driver中
  /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
  private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
          //该方法很重要,每次batch处理完数据,driver会发信息把过去的数据block清理掉
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
          //重磅!发送该消息指令来动态调整数据接受的速度,限流
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
          registeredBlockGenerators.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })

所以ReceiverTracker和ReceiverSupervisorImpl内部都有消息通讯体。
数据一条条的接受后,需要封装成block,以block为单位来处理。
来看ReceiverSupervisorImpl的start方法和它内部的方法:(有的在它的父类ReceiverSupervisor里)

  /** Start the supervisor */
  def start() {
    onStart()
    startReceiver()
  }
  override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }
  }
  private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
    with mutable.SynchronizedBuffer[BlockGenerator]

BlockGenerator会开启两个线程,一个把在一个batch内收到的数据合并为block,一个把合并后的block交给blockmanager去存储:

 * Generates batches of objects received by a
 * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
 * named blocks at regular intervals. This class starts two threads,
 * one to periodically start a new batch and prepare the previous batch of as a block,
 * the other to push the blocks into the block manager.
 *
 //ReceiverSupervisor的启动先于receivers的启动,BlockGenerator中有两个线程,由于线程启动有延迟,为了避免线程还没有启动好,来不及处理receiver接收的数据,故需要在ReceiverSupervisor而不是receivers中新建BlockGenerator
 //这里的限流
 * Note: Do not create BlockGenerator instances directly inside receivers. Use
 * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
 */
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {

blockGenerator的产生是在ReceiverSupervisor实例化的时候。

  private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
    with mutable.SynchronizedBuffer[BlockGenerator]

  /** Divides received data records into data blocks for pushing in BlockManager. */
  private val defaultBlockGeneratorListener = new BlockGeneratorListener {
    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }
  private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)

一个BlockGenerators服务于一个stream。

  override def createBlockGenerator(
      blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
    // Cleanup BlockGenerators that have already been stopped
    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    registeredBlockGenerators += newBlockGenerator
    newBlockGenerator
  }

ReceiverSupervisorImpl的start()方法调用了onstart()方法和startReceiver()方法,而onstart()方法调用了BlockGenerator的start方法:

  /** Start block generating and pushing threads. */
  def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
  }

用到了定时器,不断地把数据合并为block.

 private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

ReceiverSupervisor的start()方法调用了onstart()方法和startReceiver()方法,来看startReceiver():

  /** Start receiver */
  def startReceiver(): Unit = synchronized {
    try {
      if (onReceiverStart()) {
        logInfo("Starting receiver")
        receiverState = Started
        receiver.onStart()
        logInfo("Called receiver onStart")
      } else {
        // The driver refused us
        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
      }
    } catch {
      case NonFatal(t) =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }

来看ReceiverSupervisorImpl的onReceiverStart():

 override protected def onReceiverStart(): Boolean = {
    val msg = RegisterReceiver(
      streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
      //askWithRetry确保收到肯定回复
      //发送消息给ReceiverTrackerEndpoint
    trackerEndpoint.askWithRetry[Boolean](msg)
  }

ReceiverTrackerEndpoint收到消息后,路由到registerReceiver()来注册receiver:

      // Remote messages
      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
        val successful =
          registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
        context.reply(successful)
 /** Register a receiver */
  private def registerReceiver(
      streamId: Int,
      typ: String,
      host: String,
      executorId: String,
      receiverEndpoint: RpcEndpointRef,
      senderAddress: RpcAddress
    ): Boolean = {
    if (!receiverInputStreamIds.contains(streamId)) {
      throw new SparkException("Register received for unexpected id " + streamId)
    }

    if (isTrackerStopping || isTrackerStopped) {
      return false
    }

    val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
    val acceptableExecutors = if (scheduledLocations.nonEmpty) {
        // This receiver is registering and it's scheduled by
        // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
        scheduledLocations.get
      } else {
        // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
        // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
        scheduleReceiver(streamId)
      }

    def isAcceptable: Boolean = acceptableExecutors.exists {
      case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
      case loc: TaskLocation => loc.host == host
    }

    if (!isAcceptable) {
      // Refuse it since it's scheduled to a wrong executor
      false
    } else {
      val name = s"${typ}-${streamId}"
      val receiverTrackingInfo = ReceiverTrackingInfo(
        streamId,
        ReceiverState.ACTIVE,
        scheduledLocations = None,
        runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
        name = Some(name),
        endpoint = Some(receiverEndpoint))
      receiverTrackingInfos.put(streamId, receiverTrackingInfo)
      listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
      logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
      true
    }
  }

ReceiverSupervisor的start()方法调用的startReceiver()方法调用了receiver.onStart():

  /**
   * This method is called by the system when the receiver is started. This function
   * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
   * This function must be non-blocking, so receiving the data must occur on a different
   * thread. Received data can be stored with Spark by calling `store(data)`.
   *
   * If there are errors in threads started here, then following options can be done
   * (i) `reportError(...)` can be called to report the error to the driver.
   * The receiving of data will continue uninterrupted.
   * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
   * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
   * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
   * immediately, and then `onStart()` after a delay.
   */
  def onStart()

看一个receiver具体的子类的实现:

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

启动了一个线程来执行receive(),不断地接受数据:

/** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
      //调用方法来存储数据
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }

存储数据是用ReceiverSupervisorImpl来存储的:

  /**
   * Store a single item of received data to Spark's memory.
   * These single items will be aggregated together into data blocks before
   * being pushed into Spark's memory.
   */
  def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }
  /** Push a single record of received data into block generator. */
  def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
  }
``
BlockGenerator.addData:

/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
//同步代码块,防止死锁
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
“Cannot add data as BlockGenerator has not been started or has been stopped”)
}
}
} else {
throw new SparkException(
“Cannot add data as BlockGenerator has not been started or has been stopped”)
}
}

用定时器把数据不断地一个一个生产block.
spark默认每200ms产生一个block,可以自己配置,建议不要低于50ms,因为一个block交给一个task去运算,太小的话有task启动开销:
BlockGenerator.scala:

private val blockIntervalMs = conf.getTimeAsMs(“spark.streaming.blockInterval”, “200ms”)
require(blockIntervalMs > 0, s“‘spark.streaming.blockInterval’ should be a positive value”)

private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, “BlockGenerator”)
private val blockQueueSize = conf.getInt(“spark.streaming.blockQueueSize”, 10)
private val blocksForPushing = new ArrayBlockingQueueBlock
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

生成block:

/* Change the buffer to which single records are added to. /
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}

  if (newBlock != null) {
    blocksForPushing.put(newBlock)  // put is blocking when queue is full
  }
} catch {
  case ie: InterruptedException =>
    logInfo("Block updating timer thread was interrupted")
  case e: Exception =>
    reportError("Error in block updating thread", e)
}

}

ReceiverSupervisorImpl.pushAndReportBlock()

/* Store block and report it to driver /
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s”Pushed block blockIdin {(System.currentTimeMillis - time)} ms”)
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetryBoolean
logDebug(s”Reported block $blockId”)
}
“`

总结:receiver数据接收流程图

总体过程:ReceiverSupervisor启动时,启动了BlockGenerator和Receiver,其中BlockGenerator首先启动一个定时器定时将接收的数据封装成Block,然后启动一个线程不断将Block数据推送给BlockManager。而Receiver启动后不断接收数据,不断将接受的数据添加到BlockGenerator的currentBuffer中,BlockGenerator不断从currentBuffer取数据,这样整个数据接收过程不断循环。

这里写图片描述

本次分享来自于王家林老师的课程‘源码版本定制发行班’,在此向王家林老师表示感谢!
欢迎大家交流技术知识!一起学习,共同进步!

以上是关于第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考的主要内容,如果未能解决你的问题,请参考以下文章

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

第12课:Spark Streaming源码解读之Executor容错安全性

第13课:Spark Streaming源码解读之Driver容错安全性

第12课:Spark Streaming源码解读之Executor容错安全性

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考