SortShuffle之UnsafeShuffleWriter

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SortShuffle之UnsafeShuffleWriter相关的知识,希望对你有一定的参考价值。

参考技术A #实现方式参考图:

#UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorter和ShuffleInMemorySorter两部分,功能如下

ShuffleExternalSorter 使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair

ShuffleInMemorySorter 使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes

写文件或溢写前(spill到disk前),根据数据的PartitionId信息,使用TimSort算法对ShuffleInMemorySorter的long数组排序,排序的结果为,PartitionId相同的聚集在一起,且PartitionId较小的排在前面,ShuffleExternalSorter中的数据不需要处理,如下图所示:

依次读取ShuffleInMemorySorter中long数组的元素,再根据page number和offset信息去ShuffleExternalSorter中读取K-V Pair写入文件,如下图所示:

内存不足时,溢写数据到磁盘,每次溢写会生成上图中的一个 dataFile ,如果多次溢写产生多个dataFile,会在map端数据处理结束后进行merge合并为一个dataFile,如下图所示:

至此,UnsafeShuffleWriter的实现就介绍完了。

SPARK-7081中简述了UnsafeShuffleManager的优势,如下介绍:

1. ShuffleExternalSorter使用UnSafe API操作序列化数据,而不是Java对象,减少了内存占用及因此导致的GC耗时(参考Spark 内存管理之Tungsten),这个优化需要Serializer支持relocation。

2. ShuffleExternalSorter存原始数据,ShuffleInMemorySorter使用压缩指针存储元数据,每条记录仅占8 bytes,并且排序时不需要处理原始数据,效率高。

3. 溢写 & 合并这一步操作的是同一Partition的数据,因为使用UnSafe API直接操作序列化数据,合并时不需要反序列化数据。

4. 溢写 & 合并可以使用fastMerge提升效率(调用NIO的transferTo方法),设置spark.shuffle.unsafe.fastMergeEnabled为true,并且如果使用了压缩,需要压缩算法支持SerializedStreams的连接,各默认值如下

Spark Shuffle之Sort Shuffle 中讨论了使用UnsafeShuffleWriter需满足的前提条件,如下

#接下来分析下为什么要满足这三个要求

1. map-side aggregation:从上面的实现也可以看出,UnsafeShuffleWriter不是类似HashMap的数据结构,无法聚合key对应的value,所以无法支持map端的aggregation。

2. Partition数小于16777216:参考第一幅图,存储PartitionId信息使用24bit,能表示的最大值为 (1 << 24) = 16777215,因此Partition数要小于16777216。

3. Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。

本文介绍tungsten-sort(UnsafeShuffleWriter)的实现、优势及何种情况下被Spark使用。

#

以上是关于SortShuffle之UnsafeShuffleWriter的主要内容,如果未能解决你的问题,请参考以下文章

Spark之SortShuffle原理参考

大数据之Spark:Spark 的两种核心 Shuffle

Flink关于Flink:Flink-SortShuffle-实现简介

只了解MapReduce的Shuffle?Spark Shuffle了解一下

FlinkFlink Sort-Shuffle写流程简析

Spark Shuffle原理详解