磁盘块管理器DiskBlockManager
Posted 大冰的小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了磁盘块管理器DiskBlockManager相关的知识,希望对你有一定的参考价值。
DiskBlockManager管理和维护了逻辑上的Block和存储在Disk上的物理的Block的映射。默认情况下,一个逻辑的Block会根据它的BlockId生成的名字映射到一个物理上的文件。但是,也可以使用mapBlockToFileSegment方法映射到一个文件的一段区域。 这些物理文件会被hash到由spark.local.dir(或者通过SPARK_LOCAL_DIRS来设置)上的不同目录中
1. DiskBlockManager的构造过程
BlockManager在构造时会创建DiskBlockManager,DiskBlockManager的构造如下:
1. 调用createLocalDirs方法创建本地文件目录,然后创建二维数组subDirs,用来缓存一级目录localDirs及二级目录。二级目录的是数量配置通过spark.diskStore.subDirectories属性设置,默认为64。
DiskBlockManager为什么要创建二级目录结构?这是因为二级目录用于对文件进行散列存储,散列存储可以使所有文件都随机存放,写入或删除文件更方便,存取速度快,节省空间。
2. 添加运行时环境结束时的钩子,用于在进程关闭时创建线程,通过调用DiskBlockManager的stop方法,清除一些临时目录。
/**
* Creates and maintains the logical mapping between logical blocks and physical on-disk
* locations. By default, one block is mapped to one file with a name given by its BlockId.
* However, it is also possible to have a block map to only a segment of a file, by calling
* mapBlockToFileSegment().
*
* Block files are hashed among the directories listed in spark.local.dir (or in
* SPARK_LOCAL_DIRS, if it's set).
*/
private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
extends Logging
private[spark]
val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private[spark] val localDirs: Array[File] = createLocalDirs(conf)
if (localDirs.isEmpty)
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val shutdownHook = addShutdownHook()
...
addShutdownHook方法的实现:
private def addShutdownHook(): AnyRef =
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) () =>
logInfo("Shutdown hook called")
DiskBlockManager.this.doStop()
/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop()
// Remove the shutdown hook. It causes memory leaks if we leave it around.
try
ShutdownHookManager.removeShutdownHook(shutdownHook)
catch
case e: Exception =>
logError(s"Exception while removing shutdown hook.", e)
doStop()
private def doStop(): Unit =
// Only perform cleanup if an external service is not serving our shuffle files.
// Also blockManagerId could be null if block manager is not initialized properly.
if (!blockManager.externalShuffleServiceEnabled ||
(blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver))
localDirs.foreach localDir =>
if (localDir.isDirectory() && localDir.exists())
try
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir))
Utils.deleteRecursively(localDir)
catch
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
2. 获取磁盘文件方法 getFile
获取文件步骤:
1. 根据文件名计算哈希值;
2. 根据哈希值与本地文件一级目录的总数求余,记为dirId;
3. 根据哈希值与本地文件一级目录的总数求商,此商再与耳机目录的数目求余,记为subDirId;
4. 如果dirId/subDirId存在,则获取dirId/subDirId目录下的文件,否则新建dirId/subDirId目录
/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File =
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized
val old = subDirs(dirId)(subDirId)
if (old != null)
old
else
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir())
throw new IOException(s"Failed to create local dir in $newDir.")
subDirs(dirId)(subDirId) = newDir
newDir
new File(subDir, filename)
3. 创建临时Block文件
DiskBlockManager会为本地数据创建临时文件和ShuffleMapTask运行结束的中间结果创建临时文件。
/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) =
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists())
blockId = new TempLocalBlockId(UUID.randomUUID())
(blockId, getFile(blockId))
/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) =
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists())
blockId = new TempShuffleBlockId(UUID.randomUUID())
(blockId, getFile(blockId))
以上是关于磁盘块管理器DiskBlockManager的主要内容,如果未能解决你的问题,请参考以下文章