SPARK 是怎么清除Shuffle中间结果数据的

Posted 鸿乃江边鸟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK 是怎么清除Shuffle中间结果数据的相关的知识,希望对你有一定的参考价值。

背景

本文基于 SPARK 3.2.1
我们知道在Spark中,shuffle的数据会存在ESS或者BlockManager中的,但是spark什么时候会删除shuffle的中间结果数据呢?这篇文章我们来分析一下

分析

我们知道spark肯定会删除shuffle结果数据,要不然长期以往,磁盘肯定会爆掉。
其中清除的组件就是ContextCleaner,代码如下:

private[spark] class ContextCleaner(
    sc: SparkContext,
    shuffleDriverComponents: ShuffleDriverComponents) extends Logging 

  /**
   * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
   * have not been handled by the reference queue.
   */
  private val referenceBuffer =
    Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)

  private val referenceQueue = new ReferenceQueue[AnyRef]

  private val cleaningThread = new Thread()  override def run(): Unit = keepCleaning() 

  private val periodicGCService: ScheduledExecutorService =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
 ...
    /** Register a ShuffleDependency for cleanup when it is garbage collected. */
  def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = 
    registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
  

在spark driver端,每生成一个ShuffleDependency对象,就会把该对象注册到ContextCleaner中,数据链路如下:

   ShuffleDependency
  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
                ||
                \\/
  registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
                      ||
                      \\/
  referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))

可以看到会把该对象组装成CleanupTaskWeakReference实例(也是WeakReference实例)放到一个buffer中,关于WeakReference的作用大家可以自行网上查阅,作用就是如果一个对象只被WeakReference实例引用的话,
那该对象就会被GC,并且会把它放入该Reference实例相关联的ReferenceQueue。

再看cleaningThread 这个线程调用的方法keepCleaning:

private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) 
    while (!stopped) 
      try 
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized 
          reference.foreach  ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            ref.task match 
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
              case CleanSparkListener(listener) =>
                doCleanSparkListener(listener)
            
          
        
       catch 
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      
    
  

他会循环的从关联的队列里获取对应的被GC的对象,之后去调用对应的方法去清理数据,如doCleanupShuffle:

def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = 
    try 
      if (mapOutputTrackerMaster.containsShuffle(shuffleId)) 
        logDebug("Cleaning shuffle " + shuffleId)
        // Shuffle must be removed before it's unregistered from the output tracker
        // to find blocks served by the shuffle service on deallocated executors
        shuffleDriverComponents.removeShuffle(shuffleId, blocking)
        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
        listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
        logDebug("Cleaned shuffle " + shuffleId)
       else 
        logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
      
     catch 
      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
    
  

这里就会以消息的方式发送给BlockManager去清理对应的shuffle数据。
对应的持久化RDD/Accumulator/Broadcast/CheckpointData等数据的清理也是通过这个机制。
这里额外说一点:
对于persist方法去持久化RDD的时候,是在Task运行的时候才会把RDD组成RDDBlockId(id, partition.index)的方式且按照存储的level存储到对应的executlor的BlockManager中。
persist方法只是设置一个level的级别,如下:

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = 
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) 
      throw SparkCoreErrors.cannotChangeStorageLevelError()
    
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) 
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    
    storageLevel = newLevel
    this
  

注意:在Driver会用periodicGCService ScheduledExecutorService定期的触发GC,默认的频率是30分钟,也可以通过spark.cleaner.periodicGC.interval进行设置

至此关于spark清理shuffle中间结果数据也就分析完了。

以上是关于SPARK 是怎么清除Shuffle中间结果数据的的主要内容,如果未能解决你的问题,请参考以下文章

spark.sql.shuffle.partitions 到底指的是啥?

Spark的Shuffle是怎么回事

Spark Shuffle服务和客户端

大数据:Spark ShuffleExecutorDriver之间Shuffle结果消息传递追踪

大数据:Spark ShuffleExecutorDriver之间Shuffle结果消息传递追踪

Spark的shuffle剖析!