磁盘存储DiskStore

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了磁盘存储DiskStore相关的知识,希望对你有一定的参考价值。

当MemoryStore没有足够的空间时,就会使用DiskStore将块存入磁盘。DiskStore继承自BlockStore,实现了getBytes、putBytes、putArray、putIterator等方法。

val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")

spark.storage.memoryMapThreshold属性用于设置spark从磁盘上读取一个块后,映射到内存块的最小大小。这阻止了spark映射过小的内存块。通常,内存映射块是有开销的,应该比接近或小于操作系统的页大小。而对于小于该值的Block文件,则直接将该文件的内容读取到字节缓存区,而不是映射到内存块。


1. NIO读取方法 getBytes

  private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = 
    val channel = new RandomAccessFile(file, "r").getChannel
    Utils.tryWithSafeFinally 
      // For small files, directly read rather than memory map
      if (length < minMemoryMapBytes) 
        val buf = ByteBuffer.allocate(length.toInt)
        channel.position(offset)
        while (buf.remaining() != 0) 
          if (channel.read(buf) == -1) 
            throw new IOException("Reached EOF before filling buffer\\n" +
              s"offset=$offset\\nfile=$file.getAbsolutePath\\nbuf.remaining=$buf.remaining")
          
        
        buf.flip()
        Some(buf)
       else 
        Some(channel.map(MapMode.READ_ONLY, offset, length))
      
     
      channel.close()
    
  

2. NIO写入方法 putBytes

putBytes方法的作用是通过DiskBlockManager的getFile方法获取文件,然后使用NIO的Channel将ByteBuffer写入文件

  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = 
    // So that we do not modify the input offsets !
    // duplicate does not copy buffer, so inexpensive
    val bytes = _bytes.duplicate()
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val channel = new FileOutputStream(file).getChannel
    Utils.tryWithSafeFinally 
      while (bytes.remaining > 0) 
        channel.write(bytes)
      
     
      channel.close()
    
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
    PutResult(bytes.limit(), Right(bytes.duplicate()))
  

3. 数组写入方法 putArray

putArray内部实际调用了putIterator :

  override def putArray(
      blockId: BlockId,
      values: Array[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = 
    putIterator(blockId, values.toIterator, level, returnValues)
  

4. Iterator写入方法putIterator

写入步骤:
1. 使用DiskBlockManager的getFile方法获取blockid对应的Block文件,并封装为FileOutputStream;
2. 调用BlockManager的dataSerializeStream方法,将FileOutputStream序列化并压缩;
3. 如果需要返回写入的数据,则将写入的文件使用getBytes读取为ByteBuffer,与文件的长度一起封装到PutResult中返回,否则只返回文件长度。

override def putIterator(
      blockId: BlockId,
      values: Iterator[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = 

    logDebug(s"Attempting to write values for block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val outputStream = new FileOutputStream(file)
    try 
      Utils.tryWithSafeFinally 
        blockManager.dataSerializeStream(blockId, outputStream, values)
       
        // Close outputStream here because it should be closed before file is deleted.
        outputStream.close()
      
     catch 
      case e: Throwable =>
        if (file.exists()) 
          if (!file.delete()) 
            logWarning(s"Error deleting $file")
          
        
        throw e
    

    val length = file.length

    val timeTaken = System.currentTimeMillis - startTime
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName, Utils.bytesToString(length), timeTaken))

    if (returnValues) 
      // Return a byte buffer for the contents of the file
      val buffer = getBytes(blockId).get
      PutResult(length, Right(buffer))
     else 
      PutResult(length, null)
    
  

参考 深入理解Spark核心思想与源码分析

以上是关于磁盘存储DiskStore的主要内容,如果未能解决你的问题,请参考以下文章

Redis持久化存储

Spark存储系统详解

Redis 持久化,写入磁盘的方式

Redis持久化

Redis持久化

Redis学习笔记5--Redis持久化