Spark repartition和coalesce的区别

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark repartition和coalesce的区别相关的知识,希望对你有一定的参考价值。

参考技术A 有些时候,在很多partition的时候,我们想减少点partition的数量,不然写到HDFS上的文件数量也会很多很多。
我们使用reparation呢,还是coalesce。所以我们得了解这两个算子的内在区别。

要知道,repartition是一个消耗比较昂贵的操作算子,Spark出了一个优化版的repartition叫做coalesce,它可以尽量避免数据迁移,
但是你只能减少RDD的partition.

举个例子,有如下数据节点分布:

用coalesce,将partition减少到2个:

注意,Node1 和 Node3 不需要移动原始的数据

The repartition algorithm does a full shuffle and creates new partitions with data that’s distributed evenly.
Let’s create a DataFrame with the numbers from 1 to 12.

repartition 算法会做一个full shuffle然后均匀分布地创建新的partition。我们创建一个1-12数字的DataFrame测试一下。

刚开始数据是这样分布的:

我们做一个full shuffle,将其repartition为2个。

这是在我机器上数据分布的情况:
Partition A: 1, 3, 4, 6, 7, 9, 10, 12
Partition B: 2, 5, 8, 11

The repartition method makes new partitions and evenly distributes the data in the new partitions (the data distribution is more even for larger data sets).
repartition方法让新的partition均匀地分布了数据(数据量大的情况下其实会更均匀)

coalesce用已有的partition去尽量减少数据shuffle。
repartition创建新的partition并且使用 full shuffle。
coalesce会使得每个partition不同数量的数据分布(有些时候各个partition会有不同的size)
然而,repartition使得每个partition的数据大小都粗略地相等。

coalesce 与 repartition的区别(我们下面说的coalesce都默认shuffle参数为false的情况)

repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] repartition只是coalesce接口中shuffle为true的实现

有1w的小文件,资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。
repartition(4):产生shuffle。这时会启动5个executor像之前介绍的那样依次读取1w个分区的文件,然后按照某个规则%4,写到4个文件中,这样分区的4个文件基本毫无规律,比较均匀。
coalesce(4):这个coalesce不会产生shuffle。那启动5个executor在不发生shuffle的时候是如何生成4个文件呢,其实会有1个或2个或3个甚至更多的executor在空跑(具体几个executor空跑与spark调度有关,与数据本地性有关,与spark集群负载有关),他并没有读取任何数据!

1.如果结果产生的文件数要比源RDD partition少,用coalesce是实现不了的,例如有4个小文件(4个partition),你要生成5个文件用coalesce实现不了,也就是说不产生shuffle,无法实现文件数变多。
2.如果你只有1个executor(1个core),源RDD partition有5个,你要用coalesce产生2个文件。那么他是预分partition到executor上的,例如0-2号分区在先executor上执行完毕,3-4号分区再次在同一个executor执行。其实都是同一个executor但是前后要串行读不同数据。与用repartition(2)在读partition上有较大不同(串行依次读0-4号partition 做%2处理)。

T表有10G数据 有100个partition 资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。我们想要结果文件只有一个

Spark中repartition和coalesce的用法

参考技术A 在Spark的Rdd中,Rdd是分区的。

有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,

每个任务计算的数据比较小时,计算速度有可能会变慢,因为处理的数据量小。但是任务的所需的调度时间会很多,

所以需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大或是调小。

还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()。

 这两个方法有什么区别,看看源码就知道了:

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)

    : RDD[T] = withScope

  if (shuffle)

    /** Distributes elements evenly across output partitions, starting from a random partition. */

    val distributePartition = (index: Int, items: Iterator[T]) =>

      var position = (new Random(index)).nextInt(numPartitions)

      items.map t =>

        // Note that the hash code of the key will just be the key itself. The HashPartitioner

        // will mod it with the number of total partitions.

        position = position + 1

        (position, t)

      

     : Iterator[(Int, T)]

    // include a shuffle step so that our upstream tasks are still distributed

    new CoalescedRDD(

      new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),

      new HashPartitioner(numPartitions)),

      numPartitions).values

   else 

    new CoalescedRDD(this, numPartitions)

  



coalesce()方法的作用是返回指定一个新的指定分区的Rdd。

如果是生成一个窄依赖的结果,那么不会发生shuffle。比如:1000个分区被重新设置成10个分区,这样不会发生shuffle。

关于Rdd的依赖,这里提一下。Rdd的依赖分为两种:窄依赖和宽依赖。

窄依赖是指父Rdd的分区最多只能被一个子Rdd的分区所引用,即一个父Rdd的分区对应一个子Rdd的分区,或者多个父Rdd的分区对应一个子Rdd的分区。

而宽依赖就是宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区,即存在一个父RDD的一个分区对应一个子RDD的多个分区。1个父RDD分区对应多个子RDD分区,这其中又分两种情况:1个父RDD对应所有子RDD分区(未经协同划分的Join)或者1个父RDD对应非全部的多个RDD分区(如groupByKey)。

如下图所示:map就是一种窄依赖,而join则会导致宽依赖

回到刚才的分区,如果分区的数量发生激烈的变化,如设置numPartitions = 1,这可能会造成运行计算的节点比你想象的要少,为了避免这个情况,可以设置shuffle=true,

那么这会增加shuffle操作。

关于这个分区的激烈的变化情况,比如分区数量从父Rdd的几千个分区设置成几个,有可能会遇到这么一个错误。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 77.0 failed 4 times, most recent failure: Lost task 1.3 in stage 77.0 (TID 6334, 192.168.8.61): java.io.IOException: Unable to acquire 16777216 bytes of memory

这个错误只要把shuffle设置成true即可解决。这个bug在spark1.6版本中被修复

当把父Rdd的分区数量增大时,比如Rdd的分区是100,设置成1000,如果shuffle为false,并不会起作用。

这时候就需要设置shuffle为true了,那么Rdd将在shuffle之后返回一个1000个分区的Rdd,数据分区方式默认是采用 hash partitioner。

最后来看看repartition()方法的源码:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] =null): RDD[T] = withScope

    coalesce(numPartitions, shuffle =true)

 

从源码可以看出,repartition()方法就是coalesce()方法shuffle为true的情况。

如果是减少分区数量建议采用coalesce(numPartitions, false)方法,这样可以避免shuffle导致数据混洗,从而提高效率!

以上是关于Spark repartition和coalesce的区别的主要内容,如果未能解决你的问题,请参考以下文章

Spark RDD 默认分区数量 - repartitions和coalesce异同

Spark 重分区函数:coalesce和repartition区别与实现,可以优化Spark程序性能

Spark transformation算子之coalesce&&repartition

Spark——算子之间的区别

coalesce和repartition的区别

coalesce