Spark Shuffle理解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Shuffle理解相关的知识,希望对你有一定的参考价值。

参考技术A spark shuffle 演进的历史

目前版本的shuffle, 都是使用排序相关的shuffle; 整体上spark shuffle分为shuffle read和shuffle write:

大体上经过排序, 聚合, 归并(多个文件spill磁盘的情况), 最终, 每个task生成2种文件: 数据文件和索引文件.

SortShuffleWriter是日常使用最频繁的shuffle过程; SortShuffleWriter主要使用 ExternalSorter 对数据进行排序, 合并, 聚合(combine). 最后产生数据文件和索引文件

    这个问题就是上述流程中, 第二点, MemoryManager怎么判断是否仍有内存空间留给内存中的shuffle write数据, 是否需要spill PartitionedAppendOnlyMap 和 PartitionedPairBuffer 的数据到磁盘? 这个问题的主要难处在于, spark内存中的数据都是有用数据, 往往无法通过GC自主控制内存, 所以如果spill时机检测的不及时, 即使产生GC可能仍会导致OOM问题. 但是如果每放入 PartitionedAppendOnlyMap 和 PartitionedPairBuffer
中一条数据就检测内存占用情况, 会导致效率极其低下. Spark如何实现呢?

我们说shuffle是可能会产生OOM的原因有2个:

UnsafeShuffleWriter 是 SortShuffleWriter 的优化版本,Tungsten-sort优化点主要在三个方面:

Spark 默认开启的是Sort Based Shuffle,想要打开Tungsten-sort ,请设置

对应的实现类是:

SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互

背景

本文基于SPARK 3.2.1
用来更好的理解spark shuffle中的点点滴滴

分析

  1. 我们直接从SortShuffleManager着手,因为这是个shuffle的纽带:
override def registerShuffle[K, V, C](
     shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle 
...

 override def getReader[K, C](
      handle: ShuffleHandle,
      startMapIndex: Int,
      endMapIndex: Int,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = 
    ...

 override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = 
...

跟shuffle紧密关联的是这三个方法,

  • 其中registerShuffle方法是在ShuffleDependency实例构建出来的时候机会被调用:
 val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, this)

其中shuffleId 是全局单调递增的,(其实这是为了一次shuffle的标示,是下游task获取上游task数据的label)

  • getWriter方法是ShuffleMapTask RunTask中被调用的:
ShuffleMapTask.runTask
     ||
     \\/
dep.shuffleWriterProcessor.write
     ||
     \\/
writer = manager.getWriter[Any, Any](
      handle: ShuffleHandle,

这里会根据已经注册好的shuffleHandle来获取对应的writer

  • getReader方法下游task读取shuffle数的时候被调用的:
ShuffledRowRDD.compure
      ||
      \\/
SparkEnv.get.shuffleManager.getReader(
    dependency.shuffleHandle,..)
     ||
     \\/
val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
          handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)

这里会根据已经注册好的shuffleHandle来获取对应的reader,这里会调用getMapSizesByExecutorId最终根据shuffleId来获取(Array[MapStatus], Array[MergeStatus])
这里包括了所有的MapStatus和MergeStatus,这样reader就能根据策略来向不同的blockManager发送shuffle fetch请求,以保证远程Executor负载均衡。
具体的写操作和读操作细节,读者可以去自己细看代码,我们这里只说一些总体的数据流思路。

  1. 再来看BlockManager,BlockManager是每个Executor都会有的,在SparkEnv创建的时候就会创建,用来管理数据块的存储的,
    其中shuffle 数据的读取和写入都是和他有关联的。
    分析一下BlockManager的跟shuffle有关的重要方法:
  private[spark] val blockStoreClient = externalBlockStoreClient.getOrElse(blockTransferService)

   
  
   def initialize(appId: String): Unit = 
        ...
     val id =
      BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
         blockManagerId = if (idFromMaster != null) idFromMaster else id
      shuffleServerId = if (externalShuffleServiceEnabled) 
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
     else 
      blockManagerId
    
     if (externalShuffleServiceEnabled && !blockManagerId.isDriver) 
      registerWithExternalShuffleServer()
      ...
    
    ...
   
  • blockStoreClient 变量,用来读取其他Executor的Blocks文件的,也就是shuffle数据真正去读数的组件,
    这在创建BlockManager的时候,如果开启ESS的话就会创建的:

    val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) 
        ...
    

    如果没开启ESS的话,就用自带的BlockTransferService。

  • shuffleServerId也就是blockManagerId,会在Executor创建的时候初始化,
    如果开启ESS,端口就是spark.shuffle.service.port,默认7337,否则就是spark.blockManager.port,默认是随机端口:

Executor中env.blockManager.initialize(conf.getAppId)
      ||
      \\/
registerWithExternalShuffleServer()

registerWithExternalShuffleServer这个方法是用来注册ESS的(如果开启ESS的情况下):

val shuffleConfig = new ExecutorShuffleInfo(
      diskBlockManager.localDirsString,
      diskBlockManager.subDirsPerLocalDir,
      shuffleManagerMeta)

// Synchronous and will throw an exception if we cannot connect.
 blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(
    shuffleServerId.host, shuffleServerId.port, shuffleServerId.executorId, shuffleConfig)

注册的信息包括block本地磁盘的位置,以及shuffleManagerMeta信息,注意如果这里开启push-based shuffle Server的话,就返回的是merge的路径,
否则返回sortShuffleManager的类名。
至于具体向哪个shuffle server实例注册,就是从shuffleServerId中获取的。 之后向对应的ESS发送RegisterExecutor消息
再来看YarnShuffleService对RegisterExecutor消息的回应(实际上是ExternalBlockHandler来处理的):

 else if (msgObj instanceof RegisterExecutor) 
      final Timer.Context responseDelayContext =
        metrics.registerExecutorRequestLatencyMillis.time();
      try 
        RegisterExecutor msg = (RegisterExecutor) msgObj;
        checkAuth(client, msg.appId);
        blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
        mergeManager.registerExecutor(msg.appId, msg.executorInfo);
        callback.onSuccess(ByteBuffer.wrap(new byte[0]));
       finally 
        responseDelayContext.stop();
      

其中blockManager是把逻辑shuffle block转换为实际物理存储的组件,这里的注册就是blockManager对物理文件(LocalDirs等)做映射关系
mergeManager就是push based shuffle Manager进行文件merge的组件,也就就是把merge的路径以及物理文件(LocalDirs等给注册上去,便于后续获取shuffle文件。

注意
blockStoreClient是Executor 存在的的时候才会有的组件,因为是去拉取shuffle数据;
而通过registerWithExternalShuffleServer注册的ESS组件是,可以在Executor不存在的时候提供服务的。这两者是有区别的。

至此SPARK SHUFFLE简单的流程就是这样了。

以上是关于Spark Shuffle理解的主要内容,如果未能解决你的问题,请参考以下文章

Spark--Shuffle

华为云技术分享快速理解spark-on-k8s中的external-shuffle-service

SPARK SHUFFLE中 ShuffleId BlockManagerId 以及 与ESS(External Shuffle Server)交互

[spark] shuffle

spark浅谈:

spark和mapreduce的shuffle