Spark之SortShuffle原理参考

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之SortShuffle原理参考相关的知识,希望对你有一定的参考价值。

参考技术A

Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台

Spark支持Hash Shuffle和Sort Shuffle,早期版本使用Hash Shuffle(包括优化后的Hash Shuffle)。Spark1.2起默认使用Sort Shuffle,并且Sort Shuffle在map端有三种实现,分别是UnsafeShuffleWriter、BypassMergeSortShuffleWriter、SortShuffleWriter,根据运行时信息自动选择对应的实现。

补充说明:上面SortShuffleWriter中提到的Partition,不是RDD中的Partition,而是类似Spark Shuffle之Hash Shuffle中的bucket,如果没有单独说明,Sort Shuffle相关文章中的Partition均为bucket,和源码中的变量名保持一致。

其中Serializer支持relocation

上面提到UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation是指,Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到,具体参考Introduce internal Serializer API for determining if serializers support object relocation #5924。支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,可以通过参数spark.serializer设置。

其中Sort Shuffle设置: 上述三种ShuffleWriter实现均由SortShuffleManager管理

一、Spark中的Sorted-Based Shuffle产出的结果是并没有排序的,也就是说Shuffle的Reduce阶段是没有进行排序操作的,这点和MR不一样。
二、Spark中的Sorted-Based Shuffle只是中间结果排序,也就是说Shuffle的Mapper阶段在将bucket缓存Spill到磁盘的时候进行了排序操作,生成了FileSegment,其中涉及到一个排序算法TimSort。合并FileSegment的为一个文件的同时,生成一个索引文件。
三、排序操作相当于合并相同的Key,聚合数据在一起,便于后续Reducer阶段读取相应的数据。

Sorted-Based Shuffle 的核心是借助于 ExternalSorter 把每个 ShuffleMapTask 的输出,排序到一个文件中 (FileSegmentGroup),为了区分下一个阶段 Reducer Task 不同的内容,它还需要有一个索引文件 (Index) 来告诉下游 Stage 的并行任务,那一部份是属于你的。

Shuffle Map Task 在ExternalSorter 溢出到磁盘的时候,产生一组 File#(File Group是hashShuffle中的概念,理解为一个file文件池,这里为区分,使用File的概念,FileSegment根据PartionID排序) 和 一个索引文件,File 里的 FileSegement 会进行排序,在 Reducer 端有4个Reducer Task,下游的 Task 可以很容易跟据索引 (index) 定位到这个 Fie 中的哪部份 FileSegement 是属于下游的,它相当于一个指针,下游的 Task 要向 Driver 去碓定文件在那里,然后到了这个 File 文件所在的地方,实际上会跟 BlockManager 进行沟通,BlockManager 首先会读一个 Index 文件,根据它的命名则进行解析,比如说下一个阶段的第一个 Task,一般就是抓取第一个 Segment,这是一个指针定位的过程。

补充说明:Sort-Based Shuffle 最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容划分成不同 FileSegment 构成的单一文件 File,另外一个是索引文件 Index。

重要提示:在Sorted-Shuffle中会排序吗?Sort-Based Shuffle的Mapper端在 Sort and Spill 的过程中会排序操作,而且是Spill到磁盘的时候再进行排序的。但在Reducer阶段的ApependOnlyMap过程不进行排序的。

Spark早期版本采用的是AppendOnlyMap来实现shuffle reduce阶段数据的聚合,当数据量不大时没什么问题,但当数据量很大时就会占用大量内存,最后可能OOM。所以从spark 0.9开始就引入了ExternalAppendOnlyMap来代替AppendOnlyMap。

只了解MapReduce的Shuffle?Spark Shuffle了解一下

点击蓝字


作者 | 小猴

编辑 | 小猴

分享Java、大数据内容


# 本篇要解决的问题

Spark之前的HashShuffle会有什么问题?

说一下Spark中的HashShuffle的工作方式?

是不是HashShuffle一定比SortShuffle效率低?

说一下Spark中的SortShuffle的工作方式?

Spark中的SortShuffle和MapReduce Shuffle的区别?

1

Shuffle简介


1

请把MapReduce和Spark联系起来

最近发现,很多人在用Spark的时候,根本没法和MapReduce建立联系。还有人告诉我,Spark跟MapReduce没有关系。 “之前写的MapReduce必须要实现一个Mapper、Reducer,还要自己创建一个Job,让它执行起来。而现在写的Spark程序我只需要写算子就可以了。根本没有Mapper、Reducer”。只了解MapReduce的Shuffle?Spark Shuffle了解一下


听完这句话,大家是不是有一种莫名的认同感只了解MapReduce的Shuffle?Spark Shuffle了解一下。因为大多数人学Spark的时候,也没有想着要与MapReduce建立联系。更有人说,现在谁还用MapReduce啊,学Spark就可以了。我可以很负责的告诉大家,如果大家没能把Spark和MapReduce建立关联,那基本上等于Spark没有学会。


2

Spark shuffle简介


在Apache Spark中,Spark Shuffle描述了map任务和reduce任务之间的过程,Spark也只是MapReduce的一种实现而已,我们看到的各种算子操作,其实从根本上来讲还是Map和Reduce,只不过Spark提供了High Level API,让我们开发起来更简单而已。


Shuffle是Spark重新分配数据的一种机制,简单理解就是对数据进行重新分组。因为数据是分布式存储,每个分区的数据存储在不同的服务器中,所以在Shuffle期间,会写入到本地磁盘并在网络上传输。Shuffle操作是最昂贵的,这也说明Shuffle对Spark Job的性能影响是很大的。通过Shuffle,将数据从一个分区移动到另一个分区,这样方便用不同的方式组织,例如:分组、Join这些操作等(先别着急跟我说Broadcast Join)。


简单用一个图来描述:

只了解MapReduce的Shuffle?Spark Shuffle了解一下

上图,我们假设使用每个executor处理一个分区的数据,在执行shuffle相关的算子操作,首先会进行shuffle写入(可理解为重新分区,例如:按照key重新分区)——这个阶段其实就类似于MapReduce的Map侧Shuffle。其他的Task(类似于Reduce侧Shuffle)会进行shuffle读取,拉取属于自己的那个分区。



3

Shuffle非常重要


在Spark程序运行过程中,有许多的任务需要对集群中的数据进行shuffle。例如:我们要通过某个字段来连接两个表,而在连接之前必须得保证所有该相同值的字段放在一起。如果该字段是一个整数,整数的范围是从1到5000W。这么大量的数据是无法直接进行表关联的。


我们需要进行分区处理。例如:字段值为1-100的存储在一个分区中,无需遍历第二张表的每个分区,直接将分区与分区进行关联。这样可以极大地缩小计算量,也使得大表关联成为可能。这里要求,两个表必须要有相同的分区,否则无法进行分区关联。大量数据的Shuffle并不总是一件坏事,我们通过Shuffle可以突破在内存上计算的限制。


通过这个示例,大家一定能感受到shuffle的重要性。


在RDD中,有这样的一些典型的shuffle操作:

  • subtractByKey

  • groupBy

  • foldByKey

  • reduceByKey

  • aggregateByKey

  • join

  • distinct

  • cogroup


2

Hash Shuffle与Sort Shuffle


1

Hash Shuffle如何工作

 


在Spark 1.2之前,Hash Shuffle是Spark shuffle的默认Shuffle方式。这种shuffle方式有很多问题。Hash Shuffle针对每一个Reduce Task都会创建一个单独的文件,从而会导致有MapTask数量 * ReduceTask数量个文件总数。如果我们运行一个Job有大量的分区,这会导致很大的问题。它需要更多的输出缓冲区、FS需要打开文件的数量更多、创建和删除这些文件速度也会变慢。


这种shuffle的方式实现方式很直接:


  • 计算Reducer的数量

  • 为每一个分区创建一个单独的文件

  • 遍历要输出的record,计算每一个record的分区

  • 将record输出到文件。

只了解MapReduce的Shuffle?Spark Shuffle了解一下

2

Spark对Hash Shuffle的优化


Hash Shuffle这样操作问题很大, Spark因此对它提供了优化。在1.2.x版本的Spark中,有这样的一个参数:spark.shuffle.consolidateFiles。默认为false,当设置它为true时,它可以将MapTask输出的文件进行合并。这种方式不是为每个Reducer创建一个文件,而是创建一个文件输出池。


假设:我们有JVM运行1个Executor,每个Executor上有4个Core可用于运行MapTask,每个Task一个core。所以该Executor最多并行执行4个MapTask。这种优化方式,每一次执行MapTask会打开一组文件,然后进行写入。并行最多打开3组文件。如果executor上要运行40个任务,那么下一次并行执行MapTask,将复用之前打开的文件继续输出。

只了解MapReduce的Shuffle?Spark Shuffle了解一下

3

Spark 2.0开始停用HashShuffle


通过GitHub,我们可以查看从Spark 2.0开始,ShuffleManager的实现只有一种:SortShuffle

https://github.com/apache/spark/tree/branch-2.0/core/src/main/scala/org/apache/spark/shuffle


只了解MapReduce的Shuffle?Spark Shuffle了解一下

只了解MapReduce的Shuffle?Spark Shuffle了解一下


虽然Spark 2.0后没有了HashShuffle的实现,但它可以通过配置:spark.shuffle.sort.bypassMergeThreshold来达到HashShuffle类似的效果,默认为200。如果shuffle MapTask侧的任务数量是小于这个值,且没有Map侧的聚合,Spark在Map端是不会进行Merge-Sort的,这个过程其实就是HashShuffle。


4

Sort Shuffle如何工作


Sort shuffle有点类似于MapReduce的方式,对比于Hash Shuffle,MapTask针对每一个ReduceTask生成一个独立的文件,Sort Shuffle会按照分区Id进行排序,并为每个数据文件提供索引。通过索引可以快速地根据位置获取数据了,读取数据只需要执行一次fseek设置某个ReduceTask的开始读取的位置即可。


Sort Shuffle默认不会在MapTask对数据进行排序,分区数量少情况下也不会进行合并,除非在Reduce侧有Sort操作。在Shuffle阶段,如果没有足够的内存,就需要将数据spill到磁盘上。默认能使用的内存是 JVM堆内存大小 * 0.6 * 0.5,如果一个executor中运行了多个线程(R = executor.cores / task.cpus),还要除以R。假设JVM堆内存是800M,每个executor是运行2个任务,每个任务1个core,那就是 800 * 0.3 / 2 = 120M。超过120M就要进行spill了。

Spark使用:

spark.memory.fraction(默认:0.6)

spark.memory.storageFraction(默认:0.5)

两个参数控制Shuffle内存容量。

每个溢写文件都分别写入到磁盘中,Spark的spill并不会像MapReduce那样进行磁盘上的合并,ReduceTask请求从executor中拉取数据时,会对数据进行合并,然后ReduceTask开始拉取。

# 总结

Spark之前的HashShuffle会有什么问题?

产生大量小文件,Number(MapTask) * Number(ReduceTask),分区多了性能下降严重


说一下Spark中的HashShuffle的工作方式?

对key值进行哈希分区,每个MapTask针对每个ReduceTask生成一个文件,然后ReduceTask拉取文件进行计算。


是不是HashShuffle一定比SortShuffle效率低?

不一定,如果说分区数量小的话,HashShuffle速度很快,因为都是顺序读写。而Sort Shuffle会有一些随机IO,分区少的时候,没有HashShuffle快。但分区数量非常多,Sort Shuffle可以极大到介绍文件数量,提升IO效率。


说一下Spark中的SortShuffle的工作方式?

MapTask侧根据分区ID进行排序,输出到单独的一个文件中,ReduceTask拉取自己分区的数据。


Spark中的SortShuffle和MapReduce Shuffle的区别?

Spark排序会根据下游算子判断是否需要在MapTask侧排序,否则是不会按key排序的,而且Spark SortShuffle不会像MR那样内存→磁盘、磁盘→磁盘做Merge。直接在Reduce侧Merge。


THE

END


推荐你看 

以上是关于Spark之SortShuffle原理参考的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark 的两种核心 Shuffle

SortShuffle之UnsafeShuffleWriter

Spark原理 V3.0 新特性

Spark Shuffle原理详解

Spark Shuffle原理详解

只了解MapReduce的Shuffle?Spark Shuffle了解一下