Spark Shuffle理解
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Shuffle理解相关的知识,希望对你有一定的参考价值。
参考技术A spark shuffle 演进的历史目前版本的shuffle, 都是使用排序相关的shuffle; 整体上spark shuffle分为shuffle read和shuffle write:
大体上经过排序, 聚合, 归并(多个文件spill磁盘的情况), 最终, 每个task生成2种文件: 数据文件和索引文件.
SortShuffleWriter是日常使用最频繁的shuffle过程; SortShuffleWriter主要使用 ExternalSorter 对数据进行排序, 合并, 聚合(combine). 最后产生数据文件和索引文件
这个问题就是上述流程中, 第二点, MemoryManager怎么判断是否仍有内存空间留给内存中的shuffle write数据, 是否需要spill PartitionedAppendOnlyMap 和 PartitionedPairBuffer 的数据到磁盘? 这个问题的主要难处在于, spark内存中的数据都是有用数据, 往往无法通过GC自主控制内存, 所以如果spill时机检测的不及时, 即使产生GC可能仍会导致OOM问题. 但是如果每放入 PartitionedAppendOnlyMap 和 PartitionedPairBuffer
中一条数据就检测内存占用情况, 会导致效率极其低下. Spark如何实现呢?
我们说shuffle是可能会产生OOM的原因有2个:
UnsafeShuffleWriter 是 SortShuffleWriter 的优化版本,Tungsten-sort优化点主要在三个方面:
Spark 默认开启的是Sort Based Shuffle,想要打开Tungsten-sort ,请设置
对应的实现类是:
SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互
背景
本文基于SPARK 3.2.1
用来更好的理解spark shuffle中的点点滴滴
分析
- 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
...
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] =
...
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] =
...
跟shuffle紧密关联的是这三个方法,
- 其中
registerShuffle
方法是在ShuffleDependency实例构建出来的时候机会被调用:
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, this)
其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)
getWriter
方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask
||
\\/
dep.shuffleWriterProcessor.write
||
\\/
writer = manager.getWriter[Any, Any](
handle: ShuffleHandle,
这里会根据已经注册好的shuffleHandle来获取对应的writer
getReader
方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure
||
\\/
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,..)
||
\\/
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])
这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。
具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。
- 再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,
其中shuffle 数据的读取和写入都是和他有关联的。
分析一下BlockManager的跟shuffle有关的重要方法:
private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService)
def initialize(appId: String): Unit =
...
val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled)
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
else
blockManagerId
if (externalShuffleServiceEnabled && !blockManagerId.isDriver)
registerWithExternalShuffleServer()
...
...
-
blockStoreClient
变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,
这在创建BlockManager的时候,如果开启ESS的话就会创建的:val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) ...
如果没开启ESS的话,就用自带的BlockTransferService。
-
shuffleServerId
也就是blockManagerId,会在Executor创建的时候初始化,
如果开启ESS,端口就是spark.shuffle.service.port
,默认7337,否则就是spark.blockManager.port
,默认是随机端口:
Executor中env.blockManager.initialize(conf.getAppId)
||
\\/
registerWithExternalShuffleServer()
registerWithExternalShuffleServer
这个方法是用来注册ESS的(如果开启ESS的情况下):
val shuffleConfig = new ExecutorShuffleInfo(
diskBlockManager.localDirsString,
diskBlockManager.subDirsPerLocalDir,
shuffleManagerMeta)
// Synchronous and will throw an exception if we cannot connect.
blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)
注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,
否则返回sortShuffleManager的类名。
至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息
再来看YarnShuffleService
对RegisterExecutor消息的回应(实际上是ExternalBlockHandler
来处理的):
else if (msgObj instanceof RegisterExecutor)
final Timer.Context responseDelayContext =
metrics.registerExecutorRequestLatencyMillis.time();
try
RegisterExecutor msg = (RegisterExecutor) msgObj;
checkAuth(client, msg.appId);
blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
mergeManager.registerExecutor(msg.appId, msg.executorInfo);
callback.onSuccess(ByteBuffer.wrap(new byte[0]));
finally
responseDelayContext.stop();
其中blockManager
是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系
mergeManager
就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。
注意:
blockStoreClient
是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;
而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。
至此SPARK SHUFFLE简单的流程就是这样了。
以上是关于Spark Shuffle理解的主要内容,如果未能解决你的问题,请参考以下文章
华为云技术分享快速理解spark-on-k8s中的external-shuffle-service
SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互