BlockManager

Posted satyrs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了BlockManager相关的知识,希望对你有一定的参考价值。

  • dropFromMemory(info查看是否可drop,drop之后report给manager,info清除)

1)TimeStampedHashMap[BlockId,BlockInfo] 是否存在可drop的block=> 从info中获得StorageLevel

2)StorageLevel:是否可存入disk且DistStore中没有此文件=>调用DiskStore.putArray或者putBytes,将此Block存入disk。

3)MemoryStore清除block

4)getCurrentBlockStatus中此BlocktellMaster=true=>调用reportBlockStatus报告状态给BlockManagerMasterEndpoint

5)blockInfo中清除BlockId,并返回Block的状态。


def dropFromMemory(
    blockId: BlockId,
    data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

  logInfo(s"Dropping block $blockId from memory")
  val info = blockInfo.get(blockId).orNull
}

  • reportBlockStatus

reportBlockStatus用于向BlockManagerMasterEndpoint报告Block的状态并且重新注册BlockManager。实现步骤如下:

1)调用tryToReportBlockStatus方法,tryToReportBlockStatus调用了BlockManagerMasterupdateBlockInfo方法向BlockManagerMasterEndpoint发送UpdateBlockInfo消息更新Block占用的内存大小、磁盘大小、存储级别等信息。

2)如果BlockManager还没有向BlockManagerMasterEndpoint注册,则调用asyncRegister方法,asyncRegister调用了reregister,最后reregister实际调用了BlockManagerMasterregisterBlockManager方法和reportAllBlocks方法,reportAllBlocks方法实际也调用了tryToReportBlockStatus

 

  • putSingle

putSignle方法用于将由一个对象构成的Block写入存储系统putSingle经过层层调用,实际调用了doPut方法。

def putSingle(
    blockId: BlockId,
    value: Any,
    level: StorageLevel,
    tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
  putIterator(blockId, Iterator(value), level, tellMaster)
}

def putIterator(
    blockId: BlockId,
    values: Iterator[Any],
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  require(values != null, "Values is null")
  doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}

  • 序列化字节块写入方法putBytes

putBytes方法用于将序列化字节组成的Block写入存储系统,putBytes实际也调用了doPut方法。

def putBytes(
    blockId: BlockId,
    bytes: ByteBuffer,
    level: StorageLevel,
    tellMaster: Boolean = true,
    effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
  require(bytes != null, "Bytes is null")
  doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}

  • 数据写入方法doPut

doPut的处理流程:

1)获取putBlockInfo,如果blockInfo中已经缓存了BlockInfo,则使用缓存中的BlockInfo,否则使用新建的BlockInfo

// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val putBlockInfo = {
  val tinfo = new BlockInfo(level, tellMaster)
  // Do atomically !
  val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  if (oldBlockOpt.isDefined) {
    if (oldBlockOpt.get.waitForReady()) {
      logWarning(s"Block $blockId already exists on this machine; not re-adding it")
      return updatedBlocks
    }
    // TODO: So the block info exists - but previous attempt to load it (?) failed.
    // What do we do now ? Retry on it ?
    oldBlockOpt.get
  } else {
    tinfo
  }
}

 

2)获取块最终使用的存储级别putLevel,根据putLevel判断块写入的BlockStore,优先使用MemoryStore,其次是OffHeapDiskStore,依据data的实际包装类型,分别调用BlockStore不同的方法,如putIteratorputArrayputBytes等。

// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)

// If we‘re storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
  case b: ByteBufferValues if putLevel.replication > 1 =>
    // Duplicate doesn‘t copy the bytes, but just creates a wrapper
    val bufferView = b.buffer.duplicate()
    Future {
      // This is a blocking action and should run in futureExecutionContext which is a cached
      // thread pool
      replicate(blockId, bufferView, putLevel)
    }(futureExecutionContext)
  case _ => null
}

putBlockInfo.synchronized {
  logTrace("Put for block %s took %s to get into synchronized block"
    .format(blockId, Utils.getUsedTimeMs(startTimeMs)))

  var marked = false
  try {
    // returnValues - Whether to return the values put
    // blockStore - The type of storage to put these values into
    val (returnValues, blockStore: BlockStore) = {
      if (putLevel.useMemory) {
        // Put it in memory first, even if it also has useDisk set to true;
        // We will drop it to disk later if the memory store can‘t hold it.
        (truememoryStore)
      } else if (putLevel.useOffHeap) {
        // Use external block store
        (falseexternalBlockStore)
      } else if (putLevel.useDisk) {
        // Don‘t get back the bytes from put unless we replicate them
        (putLevel.replication > 1, diskStore)
      } else {
        assert(putLevel == StorageLevel.NONE)
        throw new BlockException(
          blockId, s"Attempted to put block $blockId without specifying storage level!")
      }
    }

    // Actually put the values
    val result = data match {
      case IteratorValues(iterator) =>
        blockStore.putIterator(blockId, iterator, putLevel, returnValues)
      case ArrayValues(array) =>
        blockStore.putArray(blockId, array, putLevel, returnValues)
      case ByteBufferValues(bytes) =>
        bytes.rewind()
        blockStore.putBytes(blockId, bytes, putLevel)
    }
    size = result.size
    result.data match {
      case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
      case Right (newBytes) => bytesAfterPut = newBytes
      case _ =>
    }

 

3)写入完毕后,将写入操作导致从内存drop掉的Block更新到updatedBlocksArrayBuffer[(BlockId,BlockStatus)]中,使用getCurrentBlockStatus获取写入Block的状态。将putBlockInfo设置为允许其他线程读取,调用reportBlockStatus将当前Block的信息更新到BlockManagerMasterEndpoint,最后将putBlockInfo添加到updatedBlocks中。

// Keep track of which blocks are dropped from memory
if (putLevel.useMemory) {
  result.droppedBlocks.foreach { updatedBlocks += _ }
}

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
  // Now that the block is in either the memory, externalBlockStore, or disk store,
  // let other threads read it, and tell the master about it.
  marked = true
  putBlockInfo.markReady(size)
  if (tellMaster) {
    reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
  }
  updatedBlocks += ((blockId, putBlockStatus))
}

 

4)如果putlevel.replication大于1,则为了容错考虑,数据的备份数量大于1的时候,需要将Block的数据备份到其他节点上。

if (putLevel.replication > 1) {

}

以上是关于BlockManager的主要内容,如果未能解决你的问题,请参考以下文章