第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考
Posted michaelli916
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考相关的知识,希望对你有一定的参考价值。
1。数据接收架构设计模式
2。数据接收源码彻底研究
sparkstreaming接收数据的特征:
第一个特征:不断地持续地接收数据;
第二个特征:一般receiver和driver不再同一个进程中,接受后不断地汇报给driver,driver根据接受到的数据的元数据来负责调度
架构上看,有个循环器不断地接收数据,存储数据,汇报给driver,其中接收数据和存储数据不是同一个对象负责。receiver接收器有点类似mvc架构中的M,receiverSupervisor控制器(receiver由receiverSupervisor启动,接受到数据后交给receiverSupervisor存储,所以说receiverSupervisor是控制器。)有点类似mvc架构中的c,driver有点类似mvc架构中的v,展现/消费数据。
元数据相当于指针,只不过需要有一套自己的解析规则。
receiverTracker通过发送一个又一个job,每个job有1个task,每个task里面只有1个receiversupervisor函数功能来启动receiver.
receiverTracker启动的时候启动了ReceiverTrackerEndpoint,用来在工作的过程中汇报状态。
// endpoint is created when generator starts.
// This not being null means the tracker has been started and not stopped
private var endpoint: RpcEndpointRef = null
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if (!skipReceiverLaunch) launchReceivers()
logInfo("ReceiverTracker started")
trackerState = Started
}
}
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
//根据输入来源创建的InputDStream的receiver,是driver级别的,相当于一个引用句柄,只是元数据/引用的描述,是可序列化的,需要发送到executor上
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
//进行负载均衡,确认worker还活着
runDummySparkJob()
logInfo("Starting " + receivers.length + " receivers")
//发送消息 endpoint.send(StartAllReceivers(receivers))
}
收到消息后的处理:
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
//循环,每次处理一个receiver
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
}
关键方法startReceiver(receiver, executors):
/**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(
receiver: Receiver[_],
scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
}
val receiverId = receiver.streamId
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
return
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
//该封装了的函数用来start receiver
//RDD的function必须返回iterator
//这里用到了迭代器,只不过迭代器的集合里只有一个receiver
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
//新建控制器
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}
// Create the RDD using the scheduledLocations to run the receiver in a Spark job
//新建RDD,该RDD是专门为了创建receiver而创建的RDD,同普通RDD一样,需要考虑本地性
val receiverRDD: RDD[Receiver[_]] =
if (scheduledLocations.isEmpty) {
//只创建一个receiver
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
//提交作业来start receiver,在具体节点上以task的方式来执行RDD
val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")
}
控制器ReceiverSupervisorImpl:负责处理receiver接受到的数据,存储他们并汇报给driver
receiver接受数据是循环一条一条接受的,kafka,socket,KV格式。
/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
* which provides all the necessary functionality for handling the data received by
* the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
* object that is used to divide the received data stream into blocks of data.
*/
private[streaming] class ReceiverSupervisorImpl(
receiver: Receiver[_],
env: SparkEnv,
hadoopConf: Configuration,
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
ReceiverSupervisorImpl是在ReceiverTracker的startReceiver()方法(ReceiverTracker的start()方法导致了该方法的调用)中创建的。查看它的构造器可知,ReceiverSupervisorImpl创建时会拿到ReceiverTracker的Endpoint,
/** Remote RpcEndpointRef for the ReceiverTracker */
//该方法最终拿到的是ReceiverTracker的Endpoint
private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)
//新建一个消息通讯体,该通讯体在ReceiverSupervisorImpl中,即在driver中
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
//该方法很重要,每次batch处理完数据,driver会发信息把过去的数据block清理掉
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
//重磅!发送该消息指令来动态调整数据接受的速度,限流
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
registeredBlockGenerators.foreach { bg =>
bg.updateRate(eps)
}
}
})
所以ReceiverTracker和ReceiverSupervisorImpl内部都有消息通讯体。
数据一条条的接受后,需要封装成block,以block为单位来处理。
来看ReceiverSupervisorImpl的start方法和它内部的方法:(有的在它的父类ReceiverSupervisor里)
/** Start the supervisor */
def start() {
onStart()
startReceiver()
}
override protected def onStart() {
registeredBlockGenerators.foreach { _.start() }
}
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
with mutable.SynchronizedBuffer[BlockGenerator]
BlockGenerator会开启两个线程,一个把在一个batch内收到的数据合并为block,一个把合并后的block交给blockmanager去存储:
* Generates batches of objects received by a
* [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately
* named blocks at regular intervals. This class starts two threads,
* one to periodically start a new batch and prepare the previous batch of as a block,
* the other to push the blocks into the block manager.
*
//ReceiverSupervisor的启动先于receivers的启动,BlockGenerator中有两个线程,由于线程启动有延迟,为了避免线程还没有启动好,来不及处理receiver接收的数据,故需要在ReceiverSupervisor而不是receivers中新建BlockGenerator
//这里的限流
* Note: Do not create BlockGenerator instances directly inside receivers. Use
* `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it.
*/
private[streaming] class BlockGenerator(
listener: BlockGeneratorListener,
receiverId: Int,
conf: SparkConf,
clock: Clock = new SystemClock()
) extends RateLimiter(conf) with Logging {
blockGenerator的产生是在ReceiverSupervisor实例化的时候。
private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
with mutable.SynchronizedBuffer[BlockGenerator]
/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
def onAddData(data: Any, metadata: Any): Unit = { }
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}
private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)
一个BlockGenerators服务于一个stream。
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
registeredBlockGenerators += newBlockGenerator
newBlockGenerator
}
ReceiverSupervisorImpl的start()方法调用了onstart()方法和startReceiver()方法,而onstart()方法调用了BlockGenerator的start方法:
/** Start block generating and pushing threads. */
def start(): Unit = synchronized {
if (state == Initialized) {
state = Active
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
} else {
throw new SparkException(
s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
}
}
用到了定时器,不断地把数据合并为block.
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
ReceiverSupervisor的start()方法调用了onstart()方法和startReceiver()方法,来看startReceiver():
/** Start receiver */
def startReceiver(): Unit = synchronized {
try {
if (onReceiverStart()) {
logInfo("Starting receiver")
receiverState = Started
receiver.onStart()
logInfo("Called receiver onStart")
} else {
// The driver refused us
stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
}
} catch {
case NonFatal(t) =>
stop("Error starting receiver " + streamId, Some(t))
}
}
来看ReceiverSupervisorImpl的onReceiverStart():
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
//askWithRetry确保收到肯定回复
//发送消息给ReceiverTrackerEndpoint
trackerEndpoint.askWithRetry[Boolean](msg)
}
ReceiverTrackerEndpoint收到消息后,路由到registerReceiver()来注册receiver:
// Remote messages
case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
/** Register a receiver */
private def registerReceiver(
streamId: Int,
typ: String,
host: String,
executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
if (!receiverInputStreamIds.contains(streamId)) {
throw new SparkException("Register received for unexpected id " + streamId)
}
if (isTrackerStopping || isTrackerStopped) {
return false
}
val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
val acceptableExecutors = if (scheduledLocations.nonEmpty) {
// This receiver is registering and it's scheduled by
// ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
scheduledLocations.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
def isAcceptable: Boolean = acceptableExecutors.exists {
case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
case loc: TaskLocation => loc.host == host
}
if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
val name = s"${typ}-${streamId}"
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
scheduledLocations = None,
runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
true
}
}
ReceiverSupervisor的start()方法调用的startReceiver()方法调用了receiver.onStart():
/**
* This method is called by the system when the receiver is started. This function
* must initialize all resources (threads, buffers, etc.) necessary for receiving data.
* This function must be non-blocking, so receiving the data must occur on a different
* thread. Received data can be stored with Spark by calling `store(data)`.
*
* If there are errors in threads started here, then following options can be done
* (i) `reportError(...)` can be called to report the error to the driver.
* The receiving of data will continue uninterrupted.
* (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
* clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
def onStart()
看一个receiver具体的子类的实现:
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
启动了一个线程来执行receive(),不断地接受数据:
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
//调用方法来存储数据
store(iterator.next)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
存储数据是用ReceiverSupervisorImpl来存储的:
/**
* Store a single item of received data to Spark's memory.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
*/
def store(dataItem: T) {
supervisor.pushSingle(dataItem)
}
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
defaultBlockGenerator.addData(data)
}
``
BlockGenerator.addData:
/**
* Push a single data item into the buffer.
*/
def addData(data: Any): Unit = {
if (state == Active) {
waitToPush()
//同步代码块,防止死锁
synchronized {
if (state == Active) {
currentBuffer += data
} else {
throw new SparkException(
“Cannot add data as BlockGenerator has not been started or has been stopped”)
}
}
} else {
throw new SparkException(
“Cannot add data as BlockGenerator has not been started or has been stopped”)
}
}
用定时器把数据不断地一个一个生产block.
spark默认每200ms产生一个block,可以自己配置,建议不要低于50ms,因为一个block交给一个task去运算,太小的话有task启动开销:
BlockGenerator.scala:
private val blockIntervalMs = conf.getTimeAsMs(“spark.streaming.blockInterval”, “200ms”)
require(blockIntervalMs > 0, s“‘spark.streaming.blockInterval’ should be a positive value”)
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, “BlockGenerator”)
private val blockQueueSize = conf.getInt(“spark.streaming.blockQueueSize”, 10)
private val blocksForPushing = new ArrayBlockingQueueBlock
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
生成block:
/* Change the buffer to which single records are added to. /
private def updateCurrentBuffer(time: Long): Unit = {
try {
var newBlock: Block = null
synchronized {
if (currentBuffer.nonEmpty) {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
listener.onGenerateBlock(blockId)
newBlock = new Block(blockId, newBlockBuffer)
}
}
if (newBlock != null) {
blocksForPushing.put(newBlock) // put is blocking when queue is full
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case e: Exception =>
reportError("Error in block updating thread", e)
}
}
ReceiverSupervisorImpl.pushAndReportBlock()
/* Store block and report it to driver /
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s”Pushed block
blockIdin
{(System.currentTimeMillis - time)} ms”)
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetryBoolean
logDebug(s”Reported block $blockId”)
}
“`
总结:receiver数据接收流程图
总体过程:ReceiverSupervisor启动时,启动了BlockGenerator和Receiver,其中BlockGenerator首先启动一个定时器定时将接收的数据封装成Block,然后启动一个线程不断将Block数据推送给BlockManager。而Receiver启动后不断接收数据,不断将接受的数据添加到BlockGenerator的currentBuffer中,BlockGenerator不断从currentBuffer取数据,这样整个数据接收过程不断循环。
本次分享来自于王家林老师的课程‘源码版本定制发行班’,在此向王家林老师表示感谢!
欢迎大家交流技术知识!一起学习,共同进步!
以上是关于第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考的主要内容,如果未能解决你的问题,请参考以下文章
(版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
第12课:Spark Streaming源码解读之Executor容错安全性
第13课:Spark Streaming源码解读之Driver容错安全性
第12课:Spark Streaming源码解读之Executor容错安全性