spark 4种 shuffle机制与mapreduce shuffle机制对比

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 4种 shuffle机制与mapreduce shuffle机制对比相关的知识,希望对你有一定的参考价值。

参考技术A 纵观整个 mapreduce过程 会发现存在许多的排序和文件合并操作。

1、key的存在combiner操作,排序之后相同的key放到一块显然方便做合并操作。

2、reduce task是按key去处理数据的。 如果没有排序那必须从所有数据中把当前相同key的所有value数据拿出来,然后进行reduce逻辑处理。显然每个key到这个逻辑都需要做一次全量数据扫描,影响性能,有了排序很方便的得到一个key对于的value集合。

3、reduce task按key去处理数据时,如果key按顺序排序,那么reduce task就按key顺序去读取,显然当读到的key是文件末尾的key那么久标志数据处理完毕。如果没有排序那还得有其他逻辑来记录哪些key处理完了,哪些key没有处理完。

1、因为内存放不下就会溢写文件,就会发生多次溢写,形成很多小文件,如果不合并,显然会小文件泛滥,集群需要资源开销去管理这些小文件数据。

2、任务去读取文件的数增多,打开的文件句柄数也会增多

3、mapreduce是全局有序。单个文件有序,不代表全局有序,只有把小文件合并一起排序才会全局有序。

虽有千万种理由需要这么做,但是很耗资源,并且像排序其实我们有些业务并不需要排序。在hadoop 2.x 排序就变为可选了。

spark的shuffle是在mapreduce shuffle基础上进行的调优。其实就是对排序、合并逻辑做了一些优化。在spark中shuffle write相当于mapreduce 的map,shuffle reade相当于mapreduce 的reduce.

spark shuffle分4种

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager

从图中可以看到,相比mapreduce,排序不见了,文件合并不见了。上游task写文件的时候只是将数据按分区追加到文件中,并没有像mapreduce 那样先内存溢写成文件,然后再文件与文件之间进行合并,虽然节省了排序、合并的开销。但有一个弊端就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。如上图 下游有3个shuffle reade task ,那每个上游shuffle write就会形成3个文件。 形成的文件数是 shuffle reade个数 × shuffle write个数。

相比第一种机制。就是在一个excutor中的task是可以共用一个buffer内存。在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了,而是允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。此时的文件个数是 CPU core的数量  × 下一个stage的task数量。

为了开启优化后的HashShuffleManager,我们可以设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。

在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager

这种机制和mapreduce差不多,在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量,由于每个task最终只有一个磁盘文件所以文件个数等于上游shuffle write个数。

相比第3中少了排序,task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass运行机制的触发条件如下:

1、shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。

2、不是聚合类的shuffle算子(比如reduceByKey)。因为不像第3种机制那样会对聚合类算子以map的数据结构存储,在写的过程中会先进行局部聚合。

spark shuffle 优于mapreduce shuffle的原因1、减少了磁盘io

2、可选的shuffle和排序

Spark的Shuffle机制

什么是Shuffle

在RDD中,将每个相同key的value聚合起来。相同key的value可能在不同partition,也可能在不同节点。因此shuffle操作会影响多个节点。

常见的shuffle操作有:groupByKey(),reduceBykey()等。

Shuffle Write和Read

Shuffle Write:上一个stage的每个map task会将处理好的相同key的数据写入一个分区文件。

Shuffle Read:reduce task就会从上一个stage的节点上寻找属于自己的分区文件,将相同key对应的value汇聚到同一个节点。

Shuffle的类型

Spark的Shuffle共有两类:HashShuffle和SortShuffle。之前默认是HashShuffle,现在默认是SortShuffle。

HashShuffle

执行流程

  1. 每个map task将不同key的数据写到不同的buffer中
  2. 每个buffer对应一个分区文件,即磁盘小文件
  3. reduce task来拉取对应的磁盘小文件

其中每个map task会有reduce task数量的分区文件,因此产生的磁盘小文件个数为:M*R(M为map task个数,R为reduce task个数)

存在的问题

主要问题是磁盘小文件过多,磁盘小文件过多会衍生出很多问题:

  • Write过程中会产生很多写入文件对象,要写入数据
  • Read过程中会产生很多读取文件对象,要读取数据
  • 对象过多,会造成JVM堆内存频繁的GC,而且如果GC还提供不了相应的内存,最终会OOM
  • 小文件数量很多,网络通信消耗也大

改进

前面是每个map task产生相应的reduce task个数的小文件。

合并机制:合并后,每个core产生对应的reduce task个数的小文件,即每个Executor产生R个,产生磁盘小文件总数:C*R(C为core的个数,R为reduce task个数)

减少了小文件数量。

SortShuffle

执行流程

  1. map task将数据结果写入内存
  2. 对内存中数据进行排序分区
  3. 溢写到磁盘,形成多个小文件
  4. 将小文件合并为一个大文件,同时生成一个索引文件
  5. reduce task去每个map task靠索引文件去数据文件拉去数据

可以发现SortShuffle的执行过程和MapReduce的Shuffle很相似,其最终只生成一个数据文件和索引文件。生成文件个数:2*M

改进,bypass机制

bupassSortShuffle少了排序的步骤。

触发条件为shuffle reduce task要小于spark.shuffle.sort.bypassMergeThreshold的参数值。产生的文件个数:2*M。

排序机制

在内存中,对数据进行排序,然后将数据写入磁盘。假设数据有100w,每次读取10w数据排序写入,就有10个文件。这时候读取10个文件的头部数据,然后采取堆排序写入最终有序的文件,由此可以形成全局有序。

Shuffle文件寻址

MapOutputTrack

负责管理磁盘小文件的地址。有MapOutputTrackerMaster存在于Driver节点,和MapOutputTrackerWorker存在于Executor节点。

BlockManager

负责管理块。有BlockManagerMaster存在Driver节点,和BlockManagerWorker存在于Executor.

寻址流程

map task执行完后,map task会将磁盘小文件的地址通过MapOutputTrackerWorker发送给Driver的MapOutputTrackerMaster,MapOutputTrackerMaster再将磁盘地址发送给reduce task端的MapOutputTrackerWorker。最后reduce task端的BlockManagerWorker和map task端的BlockManagerWorker通信,拉取数据。

以上是关于spark 4种 shuffle机制与mapreduce shuffle机制对比的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Spark:Spark 的两种核心 Shuffle

Spark的两种核心Shuffle详解(面试常问,工作常用)

Spark核心机制总结

Spark学习之路 SparkCore的调优之Shuffle调优

Spark的Shuffle机制

Spark Shuffle 机制解析