Spark版本定制第13天:Driver容错

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark版本定制第13天:Driver容错相关的知识,希望对你有一定的参考价值。

本期内容

1、ReceivedBlockTracker容错安全性

2、DStreamGraph和JobGenerator容错安全性

  一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。

  Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。

  从数据层面,ReceivedBlockTracker为整个Spark Streaming应用程序记录元数据信息。

  从调度层面,DStreamGraph和JobGenerator是Spark Streaming调度的核心,记录当前调度到哪一进度,和业务有关。

 先从ReceiverTracker角度出发

  

  如果开启WAL,则将元数据写入WAL,加入ReceivedBlockQueue

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
 try {
   val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
   if (writeResult) {
     synchronized {
       getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     }
     logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
       s"block ${receivedBlockInfo.blockStoreResult.blockId}")
   } else {
     logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
       s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
   }
   writeResult
 } catch {
   case NonFatal(e) =>
     logError(s"Error adding block $receivedBlockInfo", e)
     false
 }
}

  这里来看writeToLog方法

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
  }

  再来看看JobScheduler为为分配batch分配block

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    }
  }

  这里此writeToLog虽然每次都调用,但是方法内部还是会判断WAL是否开启。

  再进入到writeToLog内部

private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    if (isWriteAheadLogEnabled) {
      logTrace(s"Writing record: $record")
      try {
        writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
          clock.getTimeMillis())
        true
      } catch {
        case NonFatal(e) =>
          logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
          false
      }
    } else {
      true
    }
  }

  写入完成之后,ReceiverTracker部分容错就是数据部分完成

  接下来看看从job生成的角度,也就是调度层面看看容错

  根据DStreamGraph每次根据一个固定的batchINterval定时生成Job后都要调用DoCheckpoint

  

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      ssc.graph.updateCheckpointData(time)
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    }
  }

  在改方法中会根据更新时间每一个的checkpoint,调用updateCheckPointData方法

def updateCheckpointData(time: Time) {
    logInfo("Updating checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    logInfo("Updated checkpoint data for time " + time)
  }
  private[streaming] def updateCheckpointData(currentTime: Time) {
    logDebug("Updating checkpoint data for time " + currentTime)
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
  }

  总结来说,ReceivedBlockTracker处理数据层面,它通过WAL的方式

  而DStreamGraph和JobGenerator是从调度层面出发,通过checkpoint的方式

 

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

以上是关于Spark版本定制第13天:Driver容错的主要内容,如果未能解决你的问题,请参考以下文章

第13课:Spark Streaming源码解读之Driver容错安全性

第13课:Spark Streaming源码解读之Driver容错安全性

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

Spark发行版笔记13:Spark Streaming源码解读之Driver容错安全性

(版本定制)第11课:Spark Streaming源码解读之Driver中的ReceiverTracker彻底研究和思考

Spark版本定制第8天:RDD生成生命周期彻底