(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考相关的知识,希望对你有一定的参考价值。

本期内容:

    1、ReceiverTracker的架构设计

    2、消息循环系统

    3、ReceiverTracker具体实现


上节课讲到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。

ReceiverTracker主要的功能:

  1. 在Executor上启动Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收数据的速度(也就是限流)

  4. 不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver,也就是Receiver的容错功能。

  5. 接受Receiver的注册。

  6. 借助ReceivedBlockTracker来管理Receiver接收数据的元数据。

  7. 汇报Receiver发送过来的错误信息


ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。

在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers。

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。

Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker。

/** Store block and report it to driver */
def pushAndReportBlock(
   receivedBlock: ReceivedBlock
,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
 ) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
 logDebug(
s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
 logDebug(
s"Reported block $blockId")
}

当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据

case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
walBatchingThreadPool.execute(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
if (active) {
         context.reply(
addBlock(receivedBlockInfo))
       }
else {
throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
       }
     }
   })
 }
else {
   context.reply(addBlock(receivedBlockInfo))
 }

数据的元数据是交由ReceivedBlockTracker管理的

数据最终被写入到streamIdToUnallocatedBlockQueues中,一个流对应一个数据块信息的队列。

每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。

下面是简单的流程图:

技术分享

参考博客:http://blog.csdn.net/hanburgud/article/details/51471074

http://lqding.blog.51cto.com/9123978/1774994


备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上2000开设的Spark永久免费公开课,地址YY房间号:68917580


本文出自 “DT_Spark大数据梦工厂” 博客,请务必保留此出处http://18610086859.blog.51cto.com/11484530/1776027

以上是关于(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考的主要内容,如果未能解决你的问题,请参考以下文章

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

(版本定制)第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考