内存存储 MemoryStore
Posted 大冰的小屋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了内存存储 MemoryStore相关的知识,希望对你有一定的参考价值。
MemoryStroe负责将没有序列化的Java对象数组或者序列化的ByteBuffer存储到内存中。
主要包含的内容:
1. entries : 存储Block数据的Map,key 为 BlockId,value 为 MemoryEntry,并能根据存储的先后顺序访问;
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
2.unrollMemoryMap: 当前Driver或者Executor中所有线程展开的Block都存入Map中,key为线程Id,value为线程展开的所有块的内存的大小总和;
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
3.pendingUnrollMemoryMap :存放Block已经展开但是还没有放入缓存的内存
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
// Pending unroll memory refers to the intermediate memory occupied by a task
// after the unroll but before the actual putting of the block in the cache.
// This chunk of memory is expected to be released *as soon as* we finish
// caching the corresponding block as opposed to until after the task finishes.
// This is only used if a block is successfully unrolled in its entirety in
// memory (SPARK-4777).
private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
4.unrollMemoryThreshold: 每个线程用来展开Block的初始内存阈值,通过spark.storage.unrollMemoryThreshold属性设置大小;
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
5.maxMemory: 当前Driver或者Executor的最大可用内存;
/** Total amount of memory available for storage, in bytes. */
private def maxMemory: Long = memoryManager.maxStorageMemory
6.memoryUsed: 当前Driver或Executor已经使用放到的内存;
/** Total storage memory used including unroll memory, in bytes. */
private def memoryUsed: Long = memoryManager.storageMemoryUsed
7.currentUnrollMemory: 所有展开的Block的内存之和,即当前Driver或者Executor中所有线程展开的Block的内存之和;
/**
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
def currentUnrollMemory: Long = memoryManager.synchronized
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
8.blocksMemoryUsed:当前Driver或者Executor中的Blocks已经使用的内存
/**
* Amount of storage memory, in bytes, used for caching blocks.
* This does not include memory used for unrolling.
*/
private def blocksMemoryUsed: Long = memoryManager.synchronized
memoryUsed - currentUnrollMemory
1. 数据存储方法 putBytes
如果Block可以被反序列化,那么先对Block反序列化,然后调用putIterator;否则调用tryToPut方法。
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult =
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if (level.deserialized)
val values = blockManager.dataDeserialize(blockId, bytes)
putIterator(blockId, values, level, returnValues = true)
else
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
2. putIterator方法
调用unrollSafely将块在内存中安全展开,如果返回数据匹配Left(arrayValues),则说明内存足够并调用putArray方法写入内存;如果返回的是Right(iteratorValues),则说明内存不够并则选择写入磁盘或者放弃存储;
/**
* Attempt to put the given block in memory store.
*
* There may not be enough space to fully unroll the iterator in memory, in which case we
* optionally drop the values to disk if
* (1) the block's storage level specifies useDisk, and
* (2) `allowPersistToDisk` is true.
*
* One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
* back from disk and attempts to cache it in memory. In this case, we should not persist the
* block back on disk again, as it is already in disk store.
*/
private[storage] def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean,
allowPersistToDisk: Boolean): PutResult =
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
unrolledValues match
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
droppedBlocks ++= res.droppedBlocks
PutResult(res.size, res.data, droppedBlocks)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable
if (level.useDisk && allowPersistToDisk)
logWarning(s"Persisting block $blockId to disk instead.")
val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
PutResult(res.size, res.data, droppedBlocks)
else
PutResult(0, Left(iteratorValues), droppedBlocks)
3. 安全展开方法 unrollSafely
为了防止写入内存的数据过大,导致内存溢出,Spark采用了一种优化方案:在正式写入内存之前,先用逻辑方式申请内存,如果申请成功再写入内存。展开步骤如下:
1. 申请 memoryThreshold的初始大小为initialMemoryThreshold ;
2. 循环判断传入的vlues(Iterator[Any])中是否还有元素并且keepUnrolling 为true,是的话则向vector中添加values中的对象,memoryThreshold加1。如果values中没有元素或者keepUnrolling 为false,则跳转至第4步;
3. 其中在向vector中增加对象后,需要判断elementsUnrolled % memoryCheckPeriod是否为0,是的话,则开始检查currentSize(vector.estimateSize())是否已经比memoryThreshold大。如果currentSize大于initialMemoryThreshold,则需要再申请内存,申请内存大小为currentSize * memoryGrowthFactor - memoryThreshold。
4. 循环结束后,如果keepUnrolling 为true则可以将传入的value的数据完整放入内存,返回Left(vector.toArray);否则说明不能将传入的value的数据完整放入内存,进行返回Right(vector.iterator ++ values)。
5. 在finally中更新状态
/**
* Unroll the given block in memory safely.
*
* The safety of this operation refers to avoiding potential OOM exceptions caused by
* unrolling the entirety of the block in memory at once. This is achieved by periodically
* checking whether the memory restrictions for unrolling blocks are still satisfied,
* stopping immediately if not. This check is a safeguard against the scenario in which
* there is not enough free memory to accommodate the entirety of a single block.
*
* This method returns either an array with the contents of the entire block or an iterator
* containing the values of the block (if the array would have exceeded available memory).
*/
def unrollSafely(
blockId: BlockId,
values: Iterator[Any],
droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
: Either[Array[Any], Iterator[Any]] =
// Number of elements unrolled so far
var elementsUnrolled = 0
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
val initialMemoryThreshold = unrollMemoryThreshold
// How often to check whether we need to request more memory
val memoryCheckPeriod = 16
// Memory currently reserved by this task for this particular unrolling operation
var memoryThreshold = initialMemoryThreshold
// Memory to request as a multiple of current vector size
val memoryGrowthFactor = 1.5
// Previous unroll memory held by this task, for releasing later (only at the very end)
val previousMemoryReserved = currentUnrollMemoryForThisTask
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[Any]
// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks)
if (!keepUnrolling)
logWarning(s"Failed to reserve initial memory threshold of " +
s"$Utils.bytesToString(initialMemoryThreshold) for computing block $blockId in memory.")
// Unroll this block safely, checking whether we have exceeded our threshold periodically
try
while (values.hasNext && keepUnrolling)
vector += values.next()
if (elementsUnrolled % memoryCheckPeriod == 0)
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold)
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(
blockId, amountToRequest, droppedBlocks)
// New threshold is currentSize * memoryGrowthFactor
memoryThreshold += amountToRequest
elementsUnrolled += 1
if (keepUnrolling)
// We successfully unrolled the entirety of this block
Left(vector.toArray)
else
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Right(vector.iterator ++ values)
finally
// If we return an array, the values returned here will be cached in `tryToPut` later.
// In this case, we should release the memory only after we cache the block there.
if (keepUnrolling)
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized
// Since we continue to hold onto the array until we actually cache it, we cannot
// release the unroll memory yet. Instead, we transfer it to pending unroll memory
// so `tryToPut` can further transfer it to normal storage memory later.
// TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
else
// Otherwise, if we return an iterator, we can only release the unroll memory when
// the task finishes since we don't know when the iterator will be consumed.
4. 确认空闲内存方法 evictBlocksToFreeSpace
evictBlocksToFreeSpace方法用于确保是否有足够的内存,如果不足,则会释放被MemoryEntry占用的内存。
space需要释放的内存大小; selectedBlocks用于存放已经选择要从内存中释放的Block对应的BlockId的数组;freedMemory记录selectedBlocks中所有块的总大小;
处理步骤如下:
1. 统计要释放的内存块和总的大小;
2. 如果freedMemory大于了space,则说明此时已经有足够的内存了,不需要在继续释放内存空间,通过调用blockManager的dropFromMemory释放Block;否则返回失败。
/**
* Try to evict blocks to free up a given amount of space to store a particular block.
* Can fail if either the block is bigger than our memory or it would require replacing
* another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
* RDDs that don't fit into memory that we want to avoid).
*
* @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
* @return whether the requested free space is freed.
*/
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
assert(space > 0)
memoryManager.synchronized
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized
val iterator = entries.entrySet().iterator()
while (freedMemory < space && iterator.hasNext)
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
selectedBlocks += blockId
freedMemory += pair.getValue.size
if (freedMemory >= space)
logInfo(s"$selectedBlocks.size blocks selected for dropping")
for (blockId <- selectedBlocks)
val entry = entries.synchronized entries.get(blockId)
// This should never be null as only one task should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null)
val data = if (entry.deserialized)
Left(entry.value.asInstanceOf[Array[Any]])
else
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach status => droppedBlocks += ((blockId, status))
true
else
blockId.foreach id =>
logInfo(s"Will not store $id as it would require dropping another block " +
"from the same RDD")
false
5. 内存写入方法 putArray
putArray首先对对象大小进行估算,然后写入内存。如果unrollSafely返回Left(arrayValues),整个Block可以一次性放入内存中。
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult =
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
if (level.deserialized)
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
else
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
6. 尝试写入内存方法 tryToPut
当Block不支持序列化时,会调用tryToPut方法。tryToPut首先会调用releasePendingUnrollMemoryForThisTask方法清理当前task已经展开的block对应的预展开的内存,释放更多的空间。如果内存充足或者迁移其他内存Block后有足够的内存,则会创建MemoryEnrty对象,并将此对象与其blockId放入entries中。如果此时内存不足,还要把此block对应的MemoryEntry对象迁移到磁盘或者清除。
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
* created to avoid OOM since it may be a big ByteBuffer.
*
* Synchronize on `memoryManager` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*
* All blocks evicted in the process, if any, will be added to `droppedBlocks`.
*
* @return whether put was successful.
*/
private def tryToPut(
blockId: BlockId,
value: () => Any,
size: Long,
deserialized: Boolean,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean =
/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
memoryManager.synchronized
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
// The synchronization on `memoryManager` here guarantees that the release and acquire
// happen atomically. This relies on the assumption that all memory acquisitions are
// synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory)
// We acquired enough memory for the block, so go ahead and put it
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized
entries.put(blockId, entry)
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
else
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
lazy val data = if (deserialized)
Left(value().asInstanceOf[Array[Any]])
else
Right(value().asInstanceOf[ByteBuffer].duplicate())
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach status => droppedBlocks += ((blockId, status))
enoughMemory
7.获取内存数据方法 getBytes
如果MemoryEntry支持反序列化,则将MemoryEntry的value反序列化后返回,否则对MemoryEntry的value复制后返回。
override def getBytes(blockId: BlockId): Option[ByteBuffer] =
val entry = entries.synchronized
entries.get(blockId)
if (entry == null)
None
else if (entry.deserialized)
Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
else
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
8. 获取数据方法 getValues
getValues也用于从内存中获取数据,即从entries中获取MemoryEntryz,并将blockId和value返回。
override def getValues(blockId: BlockId): Option[Iterator[Any]] =
val entry = entries.synchronized
entries.get(blockId)
if (entry == null)
None
else if (entry.deserialized)
Some(entry.value.asInstanceOf[Array[Any]].iterator)
else
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
以上是关于内存存储 MemoryStore的主要内容,如果未能解决你的问题,请参考以下文章
为什么Rails的ActiveSupport :: Cache :: MemoryStore不适合大型应用程序?
在 Node 中使用 MemoryStore 存储会话数据,类似于 PHP 中的 $_SESSION['data'] = value
警告:connect.session() MemoryStore 不是为生产环境设计的,因为它会泄漏内存,并且不会扩展到单个进程