磁盘块管理器DiskBlockManager

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了磁盘块管理器DiskBlockManager相关的知识,希望对你有一定的参考价值。

DiskBlockManager管理和维护了逻辑上的Block和存储在Disk上的物理的Block的映射。默认情况下,一个逻辑的Block会根据它的BlockId生成的名字映射到一个物理上的文件。但是,也可以使用mapBlockToFileSegment方法映射到一个文件的一段区域。 这些物理文件会被hash到由spark.local.dir(或者通过SPARK_LOCAL_DIRS来设置)上的不同目录中


1. DiskBlockManager的构造过程

BlockManager在构造时会创建DiskBlockManager,DiskBlockManager的构造如下:
1. 调用createLocalDirs方法创建本地文件目录,然后创建二维数组subDirs,用来缓存一级目录localDirs及二级目录。二级目录的是数量配置通过spark.diskStore.subDirectories属性设置,默认为64。
DiskBlockManager为什么要创建二级目录结构?这是因为二级目录用于对文件进行散列存储,散列存储可以使所有文件都随机存放,写入或删除文件更方便,存取速度快,节省空间。
2. 添加运行时环境结束时的钩子,用于在进程关闭时创建线程,通过调用DiskBlockManager的stop方法,清除一些临时目录。

/**
 * Creates and maintains the logical mapping between logical blocks and physical on-disk
 * locations. By default, one block is mapped to one file with a name given by its BlockId.
 * However, it is also possible to have a block map to only a segment of a file, by calling
 * mapBlockToFileSegment().
 *
 * Block files are hashed among the directories listed in spark.local.dir (or in
 * SPARK_LOCAL_DIRS, if it's set).
 */
private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkConf)
  extends Logging 

  private[spark]
  val subDirsPerLocalDir = blockManager.conf.getInt("spark.diskStore.subDirectories", 64)

  /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
   * directory, create multiple subdirectories that we will hash files into, in order to avoid
   * having really large inodes at the top level. */
  private[spark] val localDirs: Array[File] = createLocalDirs(conf)
  if (localDirs.isEmpty) 
    logError("Failed to create any local dir.")
    System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
  
  // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
  // of subDirs(i) is protected by the lock of subDirs(i)
  private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

  private val shutdownHook = addShutdownHook()
  ...

addShutdownHook方法的实现:

  private def addShutdownHook(): AnyRef = 
    ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1)  () =>
      logInfo("Shutdown hook called")
      DiskBlockManager.this.doStop()
    
  

  /** Cleanup local dirs and stop shuffle sender. */
  private[spark] def stop() 
    // Remove the shutdown hook.  It causes memory leaks if we leave it around.
    try 
      ShutdownHookManager.removeShutdownHook(shutdownHook)
     catch 
      case e: Exception =>
        logError(s"Exception while removing shutdown hook.", e)
    
    doStop()
  

  private def doStop(): Unit = 
    // Only perform cleanup if an external service is not serving our shuffle files.
    // Also blockManagerId could be null if block manager is not initialized properly.
    if (!blockManager.externalShuffleServiceEnabled ||
      (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) 
      localDirs.foreach  localDir =>
        if (localDir.isDirectory() && localDir.exists()) 
          try 
            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) 
              Utils.deleteRecursively(localDir)
            
           catch 
            case e: Exception =>
              logError(s"Exception while deleting local spark dir: $localDir", e)
          
        
      
    
  

2. 获取磁盘文件方法 getFile

获取文件步骤:
1. 根据文件名计算哈希值;
2. 根据哈希值与本地文件一级目录的总数求余,记为dirId;
3. 根据哈希值与本地文件一级目录的总数求商,此商再与耳机目录的数目求余,记为subDirId;
4. 如果dirId/subDirId存在,则获取dirId/subDirId目录下的文件,否则新建dirId/subDirId目录

  /** Looks up a file by hashing it into one of our local subdirectories. */
  // This method should be kept in sync with
  // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
  def getFile(filename: String): File = 
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized 
      val old = subDirs(dirId)(subDirId)
      if (old != null) 
        old
       else 
        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
        if (!newDir.exists() && !newDir.mkdir()) 
          throw new IOException(s"Failed to create local dir in $newDir.")
        
        subDirs(dirId)(subDirId) = newDir
        newDir
      
    

    new File(subDir, filename)
  

3. 创建临时Block文件

DiskBlockManager会为本地数据创建临时文件和ShuffleMapTask运行结束的中间结果创建临时文件。

  /** Produces a unique block id and File suitable for storing local intermediate results. */
  def createTempLocalBlock(): (TempLocalBlockId, File) = 
    var blockId = new TempLocalBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) 
      blockId = new TempLocalBlockId(UUID.randomUUID())
    
    (blockId, getFile(blockId))
  

  /** Produces a unique block id and File suitable for storing shuffled intermediate results. */
  def createTempShuffleBlock(): (TempShuffleBlockId, File) = 
    var blockId = new TempShuffleBlockId(UUID.randomUUID())
    while (getFile(blockId).exists()) 
      blockId = new TempShuffleBlockId(UUID.randomUUID())
    
    (blockId, getFile(blockId))
  

参考 深入理解Spark核心思想与源码分析

以上是关于磁盘块管理器DiskBlockManager的主要内容,如果未能解决你的问题,请参考以下文章

逻辑卷管理器(LVM)

Linux磁盘管理的Linux管理命令

任务管理器 各项参数是啥意思?!

BlockManager

BlockManager

并发编程-4