大数据:Spark Storage 集群下的区块管理
Posted raintungli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:Spark Storage 集群下的区块管理相关的知识,希望对你有一定的参考价值。
Storage模块
在Spark中提及最多的是RDD,而RDD所交互的数据是通过Storage来实现和管理
Storage模块整体架构
1. 存储层
在Spark里,单节点的Storage的管理是通过block来管理的,每个Block的存储可以在内存里或者在磁盘中,在BlockManager里既可以管理内存的存储,同时也管理硬盘的存储,存储的标识是通过块的ID来区分的。2. 集群下的架构
2.1 架构
在集群下Spark的Block的管理架构使用Master-Slave模式
- Master : 拥有所有block的具体信息(本地和Slave节点)
- Slave : 通过master获取block的信息,并且汇报自己的信息
这里的Master并不是Spark集群中分配任务的Master,而是
提交task的客户端Driver,这里并没有主备设计,因为Driver client是单点的,通常Driver client crash了,计算也没有结果了,在Storage 的集群管理中Master是由driver承担。
在Executor在运行task的时候,通过blockManager获取本地的block块,如果本地找不到,尝试通过master去获取远端的块
for (pid <- Random.shuffle(Seq.range(0, numBlocks)))
val pieceId = BroadcastBlockId(id, "piece" + pid)
logDebug(s"Reading piece $pieceId of $broadcastId")
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
bm.getLocalBytes(pieceId) match
case Some(block) =>
blocks(pid) = block
releaseLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match
case Some(b) =>
if (checksumEnabled)
val sum = calcChecksum(b.chunks(0))
if (sum != checksums(pid))
throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
s" $sum != $checksums(pid)")
// We found the block from remote executors/driver's BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true))
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
blocks(pid) = b
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
2.2 Executor获取块内容的位置
唯一的 blockID: broadcast_0_piece0
请求Master获取该BlockID所在的 Location,也就是BlockManagerId的集合
/** Get locations of the blockId from the driver */
def getLocations(blockId: BlockId): Seq[BlockManagerId] =
driverEndpoint.askWithRetry[Seq[BlockManagerId]](GetLocations(blockId))
唯一的 BlockManagerId
BlockManagerId(driver, 192.168.121.101, 55153, None)
Executor ID, executor ID, 对driver来说就是driver Host: executor/driver IP Port: executor/driver Port每一个executor, 和driver 都生成唯一的BlockManagerId
2.3 Executor获取块的内容
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] =
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
var totalFailureCount = 0
val locations = getLocations(blockId)
val maxFetchFailures = locations.size
var locationIterator = locations.iterator
while (locationIterator.hasNext)
val loc = locationIterator.next()
logDebug(s"Getting remote block $blockId from $loc")
val data = try
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
catch
case NonFatal(e) =>
runningFailureCount += 1
totalFailureCount += 1
if (totalFailureCount >= maxFetchFailures)
// Give up trying anymore locations. Either we've tried all of the original locations,
// or we've refreshed the list of locations from the master, and have still
// hit failures after trying locations from the refreshed list.
logWarning(s"Failed to fetch block after $totalFailureCount fetch failures. " +
s"Most recent failure cause:", e)
return None
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $runningFailureCount)", e)
// If there is a large number of executors then locations list can contain a
// large number of stale entries causing a large number of retries that may
// take a significant amount of time. To get rid of these stale entries
// we refresh the block locations after a certain number of fetch failures
if (runningFailureCount >= maxFailuresBeforeLocationRefresh)
locationIterator = getLocations(blockId).iterator
logDebug(s"Refreshed locations from the driver " +
s"after $runningFailureCount fetch failures.")
runningFailureCount = 0
// This location failed, so we retry fetch from a different one by returning null here
null
if (data != null)
return Some(new ChunkedByteBuffer(data))
logDebug(s"The value of block $blockId is null")
logDebug(s"Block $blockId not found")
None
通过获取的BlockManagerId的集合列表,顺序的从列表中取出一个拥有该Block的服务器,通过
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
同步的获取块的内容,如果该块不存在,则换下一个拥有该Block的服务器
2.4 BlockManager注册
Driver 初始化SparkContext.init 的时候,会初始化BlockManager.initializeval idFromMaster = master.registerBlockManager(
id,
maxMemory,
slaveEndpoint)
会通过master 注册BlockManager
def registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveEndpoint: RpcEndpointRef): BlockManagerId =
logInfo(s"Registering BlockManager $blockManagerId")
val updatedId = driverEndpoint.askWithRetry[BlockManagerId](
RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint))
logInfo(s"Registered BlockManager $updatedId")
updatedId
在BlockManagerMaster里,我们看到了endpoint是强制的driver,也就是默认是driver 是master 无论driver,还是executor都是初始化后BlockManager,发消息给driver master进行注册,唯一不同的是driver标识自己的ID是driver,而executor是按照executor id来标识自己的
2.5 Driver Master的endpoint
前面一节已经介绍过无论driver还是executor 都会发送消息到Driver的Master,在Driver 和Executor里SparkEnv.create的时候会初始化BlockManagerMasterval blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
注册一个lookup的endpoint
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef =
if (isDriver)
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
else
RpcUtils.makeDriverRef(name, conf, rpcEnv)
代码中可以看到只有isDriver的时候才会setup一个rpc的endpoint,默认是netty的rpc环境,命名为:BlockManagerMaster
spark://BlockManagerMaster@192.168.121.101:40978
所有的driver, executor都会向master 40978发消息
2.6 Master和Executor消息格式
下面的代码每个case都是master和executor的消息格式override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
case _updateBlockInfo @
UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
case GetLocationsMultipleBlockIds(blockIds) =>
context.reply(getLocationsMultipleBlockIds(blockIds))
case GetPeers(blockManagerId) =>
context.reply(getPeers(blockManagerId))
case GetExecutorEndpointRef(executorId) =>
context.reply(getExecutorEndpointRef(executorId))
case GetMemoryStatus =>
context.reply(memoryStatus)
case GetStorageStatus =>
context.reply(storageStatus)
case GetBlockStatus(blockId, askSlaves) =>
context.reply(blockStatus(blockId, askSlaves))
case GetMatchingBlockIds(filter, askSlaves) =>
context.reply(getMatchingBlockIds(filter, askSlaves))
case RemoveRdd(rddId) =>
context.reply(removeRdd(rddId))
case RemoveShuffle(shuffleId) =>
context.reply(removeShuffle(shuffleId))
case RemoveBroadcast(broadcastId, removeFromDriver) =>
context.reply(removeBroadcast(broadcastId, removeFromDriver))
case RemoveBlock(blockId) =>
removeBlockFromWorkers(blockId)
context.reply(true)
case RemoveExecutor(execId) =>
removeExecutor(execId)
context.reply(true)
case StopBlockManagerMaster =>
context.reply(true)
stop()
case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))
case HasCachedBlocks(executorId) =>
blockManagerIdByExecutor.get(executorId) match
case Some(bm) =>
if (blockManagerInfo.contains(bm))
val bmInfo = blockManagerInfo(bm)
context.reply(bmInfo.cachedBlocks.nonEmpty)
else
context.reply(false)
case None => context.reply(false)
2.7 Master结构关系
在Master上会保存每一个executor所对应的BlockManagerID和BlockManagerInfo,而在BlockManagerInfo中保存了每个block的状态 Executor通过心跳主动汇报自己的状态,Master更新EndPoint中Executor的状态 Executor 中的block的状态更新也会汇报给Master,只是跟新Master状态,但不会通知其他的Executor
在Executor和Master交互中是Executor主动推和获取数据的,Master只是管理executor的状态,以及Block的所在的Driver、Executor的位置及其状态,负载较小,Master没有考虑可用性,通常Master节点就是提交任务的Driver的节点。
以上是关于大数据:Spark Storage 集群下的区块管理的主要内容,如果未能解决你的问题,请参考以下文章