磁盘存储DiskStore
Posted 大冰的小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了磁盘存储DiskStore相关的知识,希望对你有一定的参考价值。
当MemoryStore没有足够的空间时,就会使用DiskStore将块存入磁盘。DiskStore继承自BlockStore,实现了getBytes、putBytes、putArray、putIterator等方法。
val minMemoryMapBytes = blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
spark.storage.memoryMapThreshold属性用于设置spark从磁盘上读取一个块后,映射到内存块的最小大小。这阻止了spark映射过小的内存块。通常,内存映射块是有开销的,应该比接近或小于操作系统的页大小。而对于小于该值的Block文件,则直接将该文件的内容读取到字节缓存区,而不是映射到内存块。
1. NIO读取方法 getBytes
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] =
val channel = new RandomAccessFile(file, "r").getChannel
Utils.tryWithSafeFinally
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes)
val buf = ByteBuffer.allocate(length.toInt)
channel.position(offset)
while (buf.remaining() != 0)
if (channel.read(buf) == -1)
throw new IOException("Reached EOF before filling buffer\\n" +
s"offset=$offset\\nfile=$file.getAbsolutePath\\nbuf.remaining=$buf.remaining")
buf.flip()
Some(buf)
else
Some(channel.map(MapMode.READ_ONLY, offset, length))
channel.close()
2. NIO写入方法 putBytes
putBytes方法的作用是通过DiskBlockManager的getFile方法获取文件,然后使用NIO的Channel将ByteBuffer写入文件
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult =
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
Utils.tryWithSafeFinally
while (bytes.remaining > 0)
channel.write(bytes)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
PutResult(bytes.limit(), Right(bytes.duplicate()))
3. 数组写入方法 putArray
putArray内部实际调用了putIterator :
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult =
putIterator(blockId, values.toIterator, level, returnValues)
4. Iterator写入方法putIterator
写入步骤:
1. 使用DiskBlockManager的getFile方法获取blockid对应的Block文件,并封装为FileOutputStream;
2. 调用BlockManager的dataSerializeStream方法,将FileOutputStream序列化并压缩;
3. 如果需要返回写入的数据,则将写入的文件使用getBytes读取为ByteBuffer,与文件的长度一起封装到PutResult中返回,否则只返回文件长度。
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult =
logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
try
Utils.tryWithSafeFinally
blockManager.dataSerializeStream(blockId, outputStream, values)
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
catch
case e: Throwable =>
if (file.exists())
if (!file.delete())
logWarning(s"Error deleting $file")
throw e
val length = file.length
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(length), timeTaken))
if (returnValues)
// Return a byte buffer for the contents of the file
val buffer = getBytes(blockId).get
PutResult(length, Right(buffer))
else
PutResult(length, null)
以上是关于磁盘存储DiskStore的主要内容,如果未能解决你的问题,请参考以下文章