SPARK 是怎么清除Shuffle中间结果数据的
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK 是怎么清除Shuffle中间结果数据的相关的知识,希望对你有一定的参考价值。
背景
本文基于 SPARK 3.2.1
我们知道在Spark中,shuffle的数据会存在ESS或者BlockManager中的,但是spark什么时候会删除shuffle的中间结果数据呢?这篇文章我们来分析一下
分析
我们知道spark肯定会删除shuffle结果数据,要不然长期以往,磁盘肯定会爆掉。
其中清除的组件就是ContextCleaner
,代码如下:
private[spark] class ContextCleaner(
sc: SparkContext,
shuffleDriverComponents: ShuffleDriverComponents) extends Logging
/**
* A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
* have not been handled by the reference queue.
*/
private val referenceBuffer =
Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
private val referenceQueue = new ReferenceQueue[AnyRef]
private val cleaningThread = new Thread() override def run(): Unit = keepCleaning()
private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
...
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit =
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
在spark driver端,每生成一个ShuffleDependency对象,就会把该对象注册到ContextCleaner
中,数据链路如下:
ShuffleDependency
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
||
\\/
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
||
\\/
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
可以看到会把该对象组装成CleanupTaskWeakReference实例(也是WeakReference实例)放到一个buffer中,关于WeakReference
的作用大家可以自行网上查阅,作用就是如果一个对象只被WeakReference实例引用的话,
那该对象就会被GC,并且会把它放入该Reference实例相关联的ReferenceQueue。
再看cleaningThread
这个线程调用的方法keepCleaning:
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc)
while (!stopped)
try
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized
reference.foreach ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
case CleanSparkListener(listener) =>
doCleanSparkListener(listener)
catch
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
他会循环的从关联的队列里获取对应的被GC的对象,之后去调用对应的方法去清理数据,如doCleanupShuffle
:
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit =
try
if (mapOutputTrackerMaster.containsShuffle(shuffleId))
logDebug("Cleaning shuffle " + shuffleId)
// Shuffle must be removed before it's unregistered from the output tracker
// to find blocks served by the shuffle service on deallocated executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
logDebug("Cleaned shuffle " + shuffleId)
else
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
catch
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
这里就会以消息的方式发送给BlockManager去清理对应的shuffle数据。
对应的持久化RDD/Accumulator/Broadcast/CheckpointData等数据的清理也是通过这个机制。
这里额外说一点:
对于persist方法去持久化RDD的时候,是在Task运行的时候才会把RDD组成RDDBlockId(id, partition.index)的方式且按照存储的level存储到对应的executlor的BlockManager中。
persist方法只是设置一个level的级别,如下:
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type =
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride)
throw SparkCoreErrors.cannotChangeStorageLevelError()
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE)
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
storageLevel = newLevel
this
注意:在Driver会用periodicGCService
ScheduledExecutorService定期的触发GC,默认的频率是30分钟,也可以通过spark.cleaner.periodicGC.interval
进行设置
至此关于spark清理shuffle中间结果数据也就分析完了。
以上是关于SPARK 是怎么清除Shuffle中间结果数据的的主要内容,如果未能解决你的问题,请参考以下文章
spark.sql.shuffle.partitions 到底指的是啥?
大数据:Spark ShuffleExecutorDriver之间Shuffle结果消息传递追踪