内存存储 MemoryStore

Posted 大冰的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了内存存储 MemoryStore相关的知识,希望对你有一定的参考价值。

MemoryStroe负责将没有序列化的Java对象数组或者序列化的ByteBuffer存储到内存中。
主要包含的内容:
1. entries : 存储Block数据的Map,key 为 BlockId,value 为 MemoryEntry,并能根据存储的先后顺序访问;

  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

2.unrollMemoryMap: 当前Driver或者Executor中所有线程展开的Block都存入Map中,key为线程Id,value为线程展开的所有块的内存的大小总和;

  // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
  // All accesses of this map are assumed to have manually synchronized on `memoryManager`
  private val unrollMemoryMap = mutable.HashMap[Long, Long]()

3.pendingUnrollMemoryMap :存放Block已经展开但是还没有放入缓存的内存

  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
  // Pending unroll memory refers to the intermediate memory occupied by a task
  // after the unroll but before the actual putting of the block in the cache.
  // This chunk of memory is expected to be released *as soon as* we finish
  // caching the corresponding block as opposed to until after the task finishes.
  // This is only used if a block is successfully unrolled in its entirety in
  // memory (SPARK-4777).
  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()

4.unrollMemoryThreshold: 每个线程用来展开Block的初始内存阈值,通过spark.storage.unrollMemoryThreshold属性设置大小;

  // Initial memory to request before unrolling any block
  private val unrollMemoryThreshold: Long =
    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

5.maxMemory: 当前Driver或者Executor的最大可用内存;

  /** Total amount of memory available for storage, in bytes. */
  private def maxMemory: Long = memoryManager.maxStorageMemory

6.memoryUsed: 当前Driver或Executor已经使用放到的内存;

  /** Total storage memory used including unroll memory, in bytes. */
  private def memoryUsed: Long = memoryManager.storageMemoryUsed

7.currentUnrollMemory: 所有展开的Block的内存之和,即当前Driver或者Executor中所有线程展开的Block的内存之和;

  /**
   * Return the amount of memory currently occupied for unrolling blocks across all tasks.
   */
  def currentUnrollMemory: Long = memoryManager.synchronized 
    unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
  

8.blocksMemoryUsed:当前Driver或者Executor中的Blocks已经使用的内存

  /**
   * Amount of storage memory, in bytes, used for caching blocks.
   * This does not include memory used for unrolling.
   */
  private def blocksMemoryUsed: Long = memoryManager.synchronized 
    memoryUsed - currentUnrollMemory
  

1. 数据存储方法 putBytes

如果Block可以被反序列化,那么先对Block反序列化,然后调用putIterator;否则调用tryToPut方法。

  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = 
    // Work on a duplicate - since the original input might be used elsewhere.
    val bytes = _bytes.duplicate()
    bytes.rewind()
    if (level.deserialized) 
      val values = blockManager.dataDeserialize(blockId, bytes)
      putIterator(blockId, values, level, returnValues = true)
     else 
      val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    
  

2. putIterator方法

调用unrollSafely将块在内存中安全展开,如果返回数据匹配Left(arrayValues),则说明内存足够并调用putArray方法写入内存;如果返回的是Right(iteratorValues),则说明内存不够并则选择写入磁盘或者放弃存储;

  /**
   * Attempt to put the given block in memory store.
   *
   * There may not be enough space to fully unroll the iterator in memory, in which case we
   * optionally drop the values to disk if
   *   (1) the block's storage level specifies useDisk, and
   *   (2) `allowPersistToDisk` is true.
   *
   * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
   * back from disk and attempts to cache it in memory. In this case, we should not persist the
   * block back on disk again, as it is already in disk store.
   */
  private[storage] def putIterator(
      blockId: BlockId,
      values: Iterator[Any],
      level: StorageLevel,
      returnValues: Boolean,
      allowPersistToDisk: Boolean): PutResult = 
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
    unrolledValues match 
      case Left(arrayValues) =>
        // Values are fully unrolled in memory, so store them as an array
        val res = putArray(blockId, arrayValues, level, returnValues)
        droppedBlocks ++= res.droppedBlocks
        PutResult(res.size, res.data, droppedBlocks)
      case Right(iteratorValues) =>
        // Not enough space to unroll this block; drop to disk if applicable
        if (level.useDisk && allowPersistToDisk) 
          logWarning(s"Persisting block $blockId to disk instead.")
          val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
          PutResult(res.size, res.data, droppedBlocks)
         else 
          PutResult(0, Left(iteratorValues), droppedBlocks)
        
    
  

3. 安全展开方法 unrollSafely

为了防止写入内存的数据过大,导致内存溢出,Spark采用了一种优化方案:在正式写入内存之前,先用逻辑方式申请内存,如果申请成功再写入内存。展开步骤如下:
1. 申请 memoryThreshold的初始大小为initialMemoryThreshold ;
2. 循环判断传入的vlues(Iterator[Any])中是否还有元素并且keepUnrolling 为true,是的话则向vector中添加values中的对象,memoryThreshold加1。如果values中没有元素或者keepUnrolling 为false,则跳转至第4步;
3. 其中在向vector中增加对象后,需要判断elementsUnrolled % memoryCheckPeriod是否为0,是的话,则开始检查currentSize(vector.estimateSize())是否已经比memoryThreshold大。如果currentSize大于initialMemoryThreshold,则需要再申请内存,申请内存大小为currentSize * memoryGrowthFactor - memoryThreshold。
4. 循环结束后,如果keepUnrolling 为true则可以将传入的value的数据完整放入内存,返回Left(vector.toArray);否则说明不能将传入的value的数据完整放入内存,进行返回Right(vector.iterator ++ values)。
5. 在finally中更新状态

  /**
   * Unroll the given block in memory safely.
   *
   * The safety of this operation refers to avoiding potential OOM exceptions caused by
   * unrolling the entirety of the block in memory at once. This is achieved by periodically
   * checking whether the memory restrictions for unrolling blocks are still satisfied,
   * stopping immediately if not. This check is a safeguard against the scenario in which
   * there is not enough free memory to accommodate the entirety of a single block.
   *
   * This method returns either an array with the contents of the entire block or an iterator
   * containing the values of the block (if the array would have exceeded available memory).
   */
  def unrollSafely(
      blockId: BlockId,
      values: Iterator[Any],
      droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
    : Either[Array[Any], Iterator[Any]] = 

    // Number of elements unrolled so far
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory
    val memoryCheckPeriod = 16
    // Memory currently reserved by this task for this particular unrolling operation
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size
    val memoryGrowthFactor = 1.5
    // Previous unroll memory held by this task, for releasing later (only at the very end)
    val previousMemoryReserved = currentUnrollMemoryForThisTask
    // Underlying vector for unrolling the block
    var vector = new SizeTrackingVector[Any]

    // Request enough memory to begin unrolling
    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks)

    if (!keepUnrolling) 
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"$Utils.bytesToString(initialMemoryThreshold) for computing block $blockId in memory.")
    

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    try 
      while (values.hasNext && keepUnrolling) 
        vector += values.next()
        if (elementsUnrolled % memoryCheckPeriod == 0) 
          // If our vector's size has exceeded the threshold, request more memory
          val currentSize = vector.estimateSize()
          if (currentSize >= memoryThreshold) 
            val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
            keepUnrolling = reserveUnrollMemoryForThisTask(
              blockId, amountToRequest, droppedBlocks)
            // New threshold is currentSize * memoryGrowthFactor
            memoryThreshold += amountToRequest
          
        
        elementsUnrolled += 1
      

      if (keepUnrolling) 
        // We successfully unrolled the entirety of this block
        Left(vector.toArray)
       else 
        // We ran out of space while unrolling the values for this block
        logUnrollFailureMessage(blockId, vector.estimateSize())
        Right(vector.iterator ++ values)
      

     finally 
      // If we return an array, the values returned here will be cached in `tryToPut` later.
      // In this case, we should release the memory only after we cache the block there.
      if (keepUnrolling) 
        val taskAttemptId = currentTaskAttemptId()
        memoryManager.synchronized 
          // Since we continue to hold onto the array until we actually cache it, we cannot
          // release the unroll memory yet. Instead, we transfer it to pending unroll memory
          // so `tryToPut` can further transfer it to normal storage memory later.
          // TODO: we can probably express this without pending unroll memory (SPARK-10907)
          val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
          unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
          pendingUnrollMemoryMap(taskAttemptId) =
            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
        
       else 
        // Otherwise, if we return an iterator, we can only release the unroll memory when
        // the task finishes since we don't know when the iterator will be consumed.
      
    
  

4. 确认空闲内存方法 evictBlocksToFreeSpace

evictBlocksToFreeSpace方法用于确保是否有足够的内存,如果不足,则会释放被MemoryEntry占用的内存。
space需要释放的内存大小; selectedBlocks用于存放已经选择要从内存中释放的Block对应的BlockId的数组;freedMemory记录selectedBlocks中所有块的总大小;
处理步骤如下:
1. 统计要释放的内存块和总的大小;
2. 如果freedMemory大于了space,则说明此时已经有足够的内存了,不需要在继续释放内存空间,通过调用blockManager的dropFromMemory释放Block;否则返回失败。

  /**
    * Try to evict blocks to free up a given amount of space to store a particular block.
    * Can fail if either the block is bigger than our memory or it would require replacing
    * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
    * RDDs that don't fit into memory that we want to avoid).
    *
    * @param blockId the ID of the block we are freeing space for, if any
    * @param space the size of this block
    * @param droppedBlocks a holder for blocks evicted in the process
    * @return whether the requested free space is freed.
    */
  private[spark] def evictBlocksToFreeSpace(
      blockId: Option[BlockId],
      space: Long,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 
    assert(space > 0)
    memoryManager.synchronized 
      var freedMemory = 0L
      val rddToAdd = blockId.flatMap(getRddId)
      val selectedBlocks = new ArrayBuffer[BlockId]
      // This is synchronized to ensure that the set of entries is not changed
      // (because of getValue or getBytes) while traversing the iterator, as that
      // can lead to exceptions.
      entries.synchronized 
        val iterator = entries.entrySet().iterator()
        while (freedMemory < space && iterator.hasNext) 
          val pair = iterator.next()
          val blockId = pair.getKey
          if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) 
            selectedBlocks += blockId
            freedMemory += pair.getValue.size
          
        
      

      if (freedMemory >= space) 
        logInfo(s"$selectedBlocks.size blocks selected for dropping")
        for (blockId <- selectedBlocks) 
          val entry = entries.synchronized  entries.get(blockId) 
          // This should never be null as only one task should be dropping
          // blocks and removing entries. However the check is still here for
          // future safety.
          if (entry != null) 
            val data = if (entry.deserialized) 
              Left(entry.value.asInstanceOf[Array[Any]])
             else 
              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
            
            val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
            droppedBlockStatus.foreach  status => droppedBlocks += ((blockId, status)) 
          
        
        true
       else 
        blockId.foreach  id =>
          logInfo(s"Will not store $id as it would require dropping another block " +
            "from the same RDD")
        
        false
      
    
  

5. 内存写入方法 putArray

putArray首先对对象大小进行估算,然后写入内存。如果unrollSafely返回Left(arrayValues),整个Block可以一次性放入内存中。

  override def putArray(
      blockId: BlockId,
      values: Array[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = 
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    if (level.deserialized) 
      val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
      tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks)
      PutResult(sizeEstimate, Left(values.iterator), droppedBlocks)
     else 
      val bytes = blockManager.dataSerialize(blockId, values.iterator)
      tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks)
      PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks)
    
  

6. 尝试写入内存方法 tryToPut

当Block不支持序列化时,会调用tryToPut方法。tryToPut首先会调用releasePendingUnrollMemoryForThisTask方法清理当前task已经展开的block对应的预展开的内存,释放更多的空间。如果内存充足或者迁移其他内存Block后有足够的内存,则会创建MemoryEnrty对象,并将此对象与其blockId放入entries中。如果此时内存不足,还要把此block对应的MemoryEntry对象迁移到磁盘或者清除。

  /**
   * Try to put in a set of values, if we can free up enough space. The value should either be
   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
   * must also be passed by the caller.
   *
   * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
   * created to avoid OOM since it may be a big ByteBuffer.
   *
   * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
   * dropping is done by only on thread at a time. Otherwise while one thread is dropping
   * blocks to free memory for one block, another thread may use up the freed space for
   * another block.
   *
   * All blocks evicted in the process, if any, will be added to `droppedBlocks`.
   *
   * @return whether put was successful.
   */
  private def tryToPut(
      blockId: BlockId,
      value: () => Any,
      size: Long,
      deserialized: Boolean,
      droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = 

    /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
     * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
     * been released, it must be ensured that those to-be-dropped blocks are not double counted
     * for freeing up more space for another block that needs to be put. Only then the actually
     * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

    memoryManager.synchronized 
      // Note: if we have previously unrolled this block successfully, then pending unroll
      // memory should be non-zero. This is the amount that we already reserved during the
      // unrolling process. In this case, we can just reuse this space to cache our block.
      // The synchronization on `memoryManager` here guarantees that the release and acquire
      // happen atomically. This relies on the assumption that all memory acquisitions are
      // synchronized on the same lock.
      releasePendingUnrollMemoryForThisTask()
      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
      if (enoughMemory) 
        // We acquired enough memory for the block, so go ahead and put it
        val entry = new MemoryEntry(value(), size, deserialized)
        entries.synchronized 
          entries.put(blockId, entry)
        
        val valuesOrBytes = if (deserialized) "values" else "bytes"
        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
          blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(blocksMemoryUsed)))
       else 
        // Tell the block manager that we couldn't put it in memory so that it can drop it to
        // disk if the block allows disk storage.
        lazy val data = if (deserialized) 
          Left(value().asInstanceOf[Array[Any]])
         else 
          Right(value().asInstanceOf[ByteBuffer].duplicate())
        
        val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
        droppedBlockStatus.foreach  status => droppedBlocks += ((blockId, status)) 
      
      enoughMemory
    
  

7.获取内存数据方法 getBytes

如果MemoryEntry支持反序列化,则将MemoryEntry的value反序列化后返回,否则对MemoryEntry的value复制后返回。

  override def getBytes(blockId: BlockId): Option[ByteBuffer] = 
    val entry = entries.synchronized 
      entries.get(blockId)
    
    if (entry == null) 
      None
     else if (entry.deserialized) 
      Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
     else 
      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
    
  

8. 获取数据方法 getValues

getValues也用于从内存中获取数据,即从entries中获取MemoryEntryz,并将blockId和value返回。

  override def getValues(blockId: BlockId): Option[Iterator[Any]] = 
    val entry = entries.synchronized 
      entries.get(blockId)
    
    if (entry == null) 
      None
     else if (entry.deserialized) 
      Some(entry.value.asInstanceOf[Array[Any]].iterator)
     else 
      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
      Some(blockManager.dataDeserialize(blockId, buffer))
    
  

参考 深入理解Spark核心思想与源码分析

以上是关于内存存储 MemoryStore的主要内容,如果未能解决你的问题,请参考以下文章

内存存储 MemoryStore

为什么Rails的ActiveSupport :: Cache :: MemoryStore不适合大型应用程序?

在 Node 中使用 MemoryStore 存储会话数据,类似于 PHP 中的 $_SESSION['data'] = value

警告:connect.session() MemoryStore 不是为生产环境设计的,因为它会泄漏内存,并且不会扩展到单个进程

具有内存存储的核心数据

Spark存储系统详解