第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考相关的知识,希望对你有一定的参考价值。
上一课我们讲解了Receiver启动的流程。Receiver是通过ReceiverSupervisor的start方法启动的:
/** Start the supervisor */ def start() { onStart() startReceiver() }
首先会调用ReceiverSupervisor的onStart()方法,
override protected def onStart() { registeredBlockGenerators.foreach { _.start() } }
而registeredBlockGenerators是在ReceiverSupervisor实例化时被赋值的:
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener) override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() } val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators += newBlockGenerator newBlockGenerator }
调用BlockGenerator的start方法
/** Start block generating and pushing threads. */ def start(): Unit = synchronized { if (state == Initialized) { state = Active blockIntervalTimer.start() blockPushingThread.start() logInfo("Started BlockGenerator") } else { throw new SparkException( s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]") } }
blockIntervalTimer是一个定时器,到时间了就调用updateCurrentBuffer函数
private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
时间间隔默认200毫秒
private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
blockPushingThread是一个线程,它不断地将数据写入到BlockManager中
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } /** Keep pushing blocks to the BlockManager. */ private def keepPushingBlocks() { logInfo("Started block pushing thread") def areBlocksBeingGenerated: Boolean = synchronized { state != StoppedGeneratingBlocks } try { // While blocks are being generated, keep polling for to-be-pushed blocks and push them. while (areBlocksBeingGenerated) { Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { case Some(block) => pushBlock(block) case None => } } // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") while (!blocksForPushing.isEmpty) { val block = blocksForPushing.take() logDebug(s"Pushing block $block") pushBlock(block) logInfo("Blocks left to push " + blocksForPushing.size()) } logInfo("Stopped block pushing thread") } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") case e: Exception => reportError("Error in block pushing thread", e) } }
从代码中可看出,每个10ms从blocksForPushing队列中取出所有的Block,调用pushBlock方法
private def pushBlock(block: Block) { listener.onPushBlock(block.id, block.buffer) logInfo("Pushed block " + block.id) }
这里的listener是ReceiverSupervisorImpl中的
private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } }
所以会去调用pushArrayBuffer方法,最终会调用如下方法:
/** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") }
该方法,将数据交给receiverBlockHandler存储,并且会将元数据汇报给ReceiverTracker。
receiverBlockHandler有两种实现方式:
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
数据最终都会交给BlockManager。
blocksForPushing的定义如下:
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
blocksForPushing的数据是由blockIntervalTimer定时器定期的将BlockGenerator的currentBuffer中的数据写入的。
/** Change the buffer to which single records are added to. */ private def updateCurrentBuffer(time: Long): Unit = { try { var newBlock: Block = null synchronized { if (currentBuffer.nonEmpty) { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } } catch { case ie: InterruptedException => logInfo("Block updating timer thread was interrupted") case e: Exception => reportError("Error in block updating thread", e) } }
我们再回过头来看看supervisor的startReceiver()方法:
/** Start receiver */ def startReceiver(): Unit = synchronized { try { if (onReceiverStart()) { logInfo("Starting receiver") receiverState = Started receiver.onStart() logInfo("Called receiver onStart") } else { // The driver refused us stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) } } catch { case NonFatal(t) => stop("Error starting receiver " + streamId, Some(t)) } }
会调用receiver的onStart方法,我们以SocketReceiver为例:
def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { setDaemon(true) override def run() { receive() } }.start() }
在该函数中,生成一个新的线程“Socket Receiver”,线程启动调用SocketReceiver的receive()方法
def receive() { var socket: Socket = null try { logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) val iterator = bytesToObjects(socket.getInputStream()) while(!isStopped && iterator.hasNext) { store(iterator.next) } if (!isStopped()) { restart("Socket data stream had no more data") } else { logInfo("Stopped receiving") } } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case NonFatal(e) => logWarning("Error receiving data", e) restart("Error receiving data", e) } finally { if (socket != null) { socket.close() logInfo("Closed socket to " + host + ":" + port) } } } }
构建了一个Socket对象,并且不断地从InputStream中接收数据,每接收一条调用一次store方法。
def store(dataItem: T) { supervisor.pushSingle(dataItem) }
数据是由ReceiverSupervisor管理的,调用supervisor.pushSingle将数据写入。
def pushSingle(data: Any) { defaultBlockGenerator.addData(data) }
defaultBlockGenerator的定义如下
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener) override def createBlockGenerator( blockGeneratorListener: BlockGeneratorListener): BlockGenerator = { // Cleanup BlockGenerators that have already been stopped registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() } val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) registeredBlockGenerators += newBlockGenerator newBlockGenerator }
它就是一个BlockGenerator,而addData函数将数据保存在BlockGenerator中的currentBuffer对象中
/** * Push a single data item into the buffer. */ def addData(data: Any): Unit = { if (state == Active) { waitToPush() synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }
数据源源不断的流进来,每个200ms就会将currentBuffer中的数据写到blocksForPushing队列中,然后在重新实例化一个currentBuffer。而blocksForPushing队列会每个10ms就写入到BlockManager中。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
本文出自 “叮咚” 博客,请务必保留此出处http://lqding.blog.51cto.com/9123978/1774426
以上是关于第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考的主要内容,如果未能解决你的问题,请参考以下文章
(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
第12课:Spark Streaming源码解读之Executor容错安全性
第13课:Spark Streaming源码解读之Driver容错安全性
第12课:Spark Streaming源码解读之Executor容错安全性