spark.sql.shuffle.partitions 的 200 个默认分区难题

Posted

技术标签:

【中文标题】spark.sql.shuffle.partitions 的 200 个默认分区难题【英文标题】:spark.sql.shuffle.partitions of 200 default partitions conundrum 【发布时间】:2018-08-21 13:39:26 【问题描述】:

在许多帖子中,由于关于洗牌、分区、JOIN、AGGR 等的一些问题,有这样或另一种形式的声明 - 如下所示:

... 通常,每当您执行 spark sql 聚合或连接数据时,这是 resulting 分区数 = 200。 这是由 spark.sql.shuffle.partitions 设置的。 ...

所以,我的问题是:

我们的意思是如果我们为 DF 设置了 765 的分区,例如, 该处理针对 765 个分区进行,但输出按标准合并/重新分区为 200 - 此处指的是词 resulting? 还是在 JOINing、AGGR 之前合并/重新分区到 200 个分区后使用 200 个分区进行处理?

我问,因为我从来没有看到一个明确的观点。

我做了以下测试:

// genned a DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size

在第一次测试 - 未定义 sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理结果和分区数为200. 尽管 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。

第二次测试 - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"),处理结果和分区数为 765 . 尽管 SO post 45704156 声明它可能不适用于 DF - 这是一个 DS。

【问题讨论】:

765 只是我决定的一个数字。 什么是 SO 帖子 45704156 *** 帖子 4.... 【参考方案1】:

这是你的猜测的组合。

假设您有一组包含 M 个分区的输入数据,并且您将 shuffle partitions 设置为 N。

在执行连接时,spark 会读取所有 M 个分区中的输入数据,并根据 key 将数据重新洗牌到 N 个分区。想象一个普通的 hashpartitioner,应用于 key 的 hash 函数看起来很像 A = hashcode(key) % N,然后这个数据被重新分配给负责处理 Ath 分区的节点。每个节点可以负责处理多个分区。

洗牌后,节点将工作以聚合它们负责的分区中的数据。由于这里不需要进行额外的洗牌,节点可以直接产生输出。

因此,总而言之,您的输出将被合并到 N 个分区,但是它是合并的,因为它是在 N 个分区中处理的,而不是因为 spark 应用了一个额外的 shuffle 阶段来专门将您的输出数据重新分区到 N。

【讨论】:

【参考方案2】:

Spark.sql.shuffle.partitions 是决定分区数量的参数,同时进行诸如连接或聚合之类的混洗,即跨节点的数据移动位置。另一部分 spark.default.parallelism 将根据您的数据大小和最大块大小计算,在 HDFS 中为 128mb。因此,如果您的工作没有进行任何 shuffle,它将考虑默认的并行度值,或者如果您使用的是 rdd,您可以自己设置它。洗牌时需要 200 次。

Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // 这将使用 4 个分区

Val df1 = df df1.except(df).count // 将生成 200 个具有 2 个阶段的分区

【讨论】:

但是我举了一个765分区的例子。我提到这一点是因为许多帖子中存在歧义,并且问题一次又一次地出现。我今晚要进行测试,然后会回来。 让我试试 765 我尝试使用具有 765 个分区的相同示例和第一个带有减号查询的实例,它生成了 766 个任务。每个分区有 765 个任务,另外还有一个用于组合最终结果。然后使用 except 生成 1166 个任务。 765 个任务用于映射分区,另外 200 个任务用于另外 2 个用于 shuffle 的阶段,1 个用于组合 这是一个更好的洞察力。我会做一个额外的测试,晚饭后回来。 请告诉我你的观察结果

以上是关于spark.sql.shuffle.partitions 的 200 个默认分区难题的主要内容,如果未能解决你的问题,请参考以下文章