Spark版本定制第9天:Receiver在Driver的精妙实现全生命周期彻底研究和思考

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark版本定制第9天:Receiver在Driver的精妙实现全生命周期彻底研究和思考相关的知识,希望对你有一定的参考价值。

本期内容:

1 Receiver生命周期

2 深度思考

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  在SparkStreaming中,都是利用Reiceiver来接受数据的。而receiver在节点上都是通过receiverTracker来管理的。在receiverTracker中会调用start方法,来启动消息循环体ReceiverTrackerEndPoint进行通信。

def start(): Unit = synchronized { 
  if (isTrackerStarted) { 
    throw new SparkException("ReceiverTracker already started") 
  } 
  
  if (!receiverInputStreams.isEmpty) { 
    endpoint = ssc.env.rpcEnv.setupEndpoint( 
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) 
    if (!skipReceiverLaunch) launchReceivers() 
    logInfo("ReceiverTracker started") 
    trackerState = Started 
  } 
} 

  在start方法会发送消息启动Receiver

private def launchReceivers(): Unit = { 
  val receivers = receiverInputStreams.map(nis => { 
    val rcvr = nis.getReceiver() 
    rcvr.setReceiverId(nis.id) 
    rcvr 
  }) 
  
  runDummySparkJob() 
  
  logInfo("Starting " + receivers.length + " receivers") 
  endpoint.send(StartAllReceivers(receivers)) 
} 

  接下来会startAllReceivers,在这里通过消息循环体ReceiverTrackerEndPoint。

case StartAllReceivers(receivers) => 
  val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) 
  for (receiver <- receivers) { 
    val executors = scheduledLocations(receiver.streamId) 
    updateReceiverScheduledExecutors(receiver.streamId, executors) 
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation 
    startReceiver(receiver, executors) 
  } 

  在此方法中,如果发生Receiver对应的Job结束了,可是ReceiverTracker还没有停止。它将会向ReceiverTrackerEndpoint发送一个ReStartReceiver的方法来重新启动reveiver,它被封装在future.onComplete中

 future.onComplete { 
  case Success(_) => 
    if (!shouldStartReceiver) { 
      onReceiverJobFinish(receiverId) 
    } else { 
      logInfo(s"Restarting Receiver $receiverId") 
      self.send(RestartReceiver(receiver)) 
    } 
  case Failure(e) => 
    if (!shouldStartReceiver) { 
      onReceiverJobFinish(receiverId) 
    } else { 
      logError("Receiver has been stopped. Try to restart it.", e) 
      logInfo(s"Restarting Receiver $receiverId") 
      self.send(RestartReceiver(receiver)) 
    } 
}(submitJobThreadPool) 

  

 

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

以上是关于Spark版本定制第9天:Receiver在Driver的精妙实现全生命周期彻底研究和思考的主要内容,如果未能解决你的问题,请参考以下文章

Spark版本定制第13天:Driver容错

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

Spark版本定制第8天:RDD生成生命周期彻底

Spark版本定制第10天:流数据生命周期和思考

Spark版本定制第6天:Job动态生成和深度思考

Spark版本定制第7天:JobScheduler内幕实现和深度思考