[spark] shuffle
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[spark] shuffle相关的知识,希望对你有一定的参考价值。
参考技术A spark 3.2.0In sort-based shuffle, store map outputs in serialized form -> spark-4550
Faster sort-based shuffle path using binary processing cache-aware sort -> SPARK-7081
可插拔的 shuffle 接口, 通过 spark.shuffle.manager 配置,默认实现是 SortShuffleManager. 在创建 SparkEnv 的时候进行初始化, 初始化时条用构造方法可以是 SparkConf , 也可以是 SparkConf 、 isDriver 两个构造参数
包含了Shuffle 的相关信息,用来传递信息到Task . 不同的ShuffleHandle 决定不同的 ShuffleWriter . 在 driver 端创建 ShuffleDependency 时通过 ShuffleManager 的 registerShuffle 方法返回
Spark 当前序列化的方式有三种 JavaSerializer、KryoSerializer、UnsafeRowSerializer 默认是 JavaSerializer 可以通过 spark.serializer 参数配置
不满足 BypassMergeSortShuffleHandle 、SerializedShuffleHandle 的条件
用来处理 map task 写数据
当 ShuffleHandle 的类型是 BypassMergeShuffleHandle 时使用 BypassMergeSortShuffleWriter . 处理数据时,对每个 partition 都创建一个 DiskBlockObjectWriter 写入数据, 不放内存没有spill, 直接写磁盘。 最后把临时的文件合并为最终的文件, 更新MapStatus 记录每个分区的长度 , MapStatus 是在调用 writer 的 stop 方法中返回的
当ShuffleHandle 的类型是 SerializedShuffleHandle 时使用 UnsafeShuffleWriter , write 方法中对数据处理主要逻辑在 ShuffleExternalSorter 中实现, 在closeAndWriteOutput 方法中对 spill 文件合并和生成 MapStatus
ShuffleExternalSorter 针对 sort-based shuffle 的外部排序,处理的是二进制的数据,内存中的数据按照partitionId 进行排序, 也是根据 partitionId spill 磁盘, 数据都是序列化压缩好的。 最后通过 UnsafeShuffleWriter 把这些文件进行合并,合并过程中避免序列化反序列化(closeAndWriteOutput方法中实现)
closeAndWriteOutput 方法对 spill 的文件进行合并,文件是序列化的,按照partitionId 排序好的,因此可以直接进行merge
当 ShuffleHandle 的类型是 BaseShuffleHandle 时使用 SortShuffleWriter, 对数据的处理使用 ExternalSorter,可以处理 map-side aggregation , 实现内存中的排序, spill 文件的合并
抽象Executor端的shuffle 操作, 用于其他的扩展实现。
用来处理 map task 中全部partition 的数据写, 包含多个 ShufflePartitionWriter
用来处理一个 partition 的数据写
主要是ShuffledRDD 使用ShuffleReader 读取数据
默认只有BlockStoreShuffleReader 具体实现 ShuffleBlockFetcherIterator
spark和mapreduce的shuffle
既然spark和mapreduce的shuffle过程都落地到磁盘,那为什么spark比mapreduce要快啊?
参考技术A Apache Spark 的 Shuffle 过程与 Apache Hadoop 的 Shuffle 过程有着诸多类似,一些概念可直接套用,例如,Shuffle 过程中,提供数据的一端,被称作 Map 端,Map 端每个生成数据的任务称为 Mapper,对应的,接收数据的一端,被称作 Reduce 端,Reduce 端每个拉取数据的任务称为 Reducer,Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分,并将数据发送给对应的 Reducer 的过程。以上是关于[spark] shuffle的主要内容,如果未能解决你的问题,请参考以下文章