Spark Dataframe Join shuffle

Posted

技术标签:

【中文标题】Spark Dataframe Join shuffle【英文标题】: 【发布时间】:2019-06-14 09:16:24 【问题描述】:

Spark 版本 1.6.0

我在两个具有 100 个分区的数据帧之间使用连接函数,该应用程序在一个集群上运行,我为每 20 个执行程序使用 5 个内核,总共 100 个内核。

我的问题是,当我进行连接时,所有记录都在一个执行器上计算,而其他执行器没有使用,如下图所示:

这会导致性能下降,因为所有数据都是使用一个执行器与其他 19 个可用的执行器计算的。

看起来 spark join 只在一个分区中“带来”所有记录,有没有办法避免这种情况?

为了确保它不会重新分配到 1,我还设置了这个 spark 属性:spark.sql.shuffle.partitions=100 实际上,两个输入数据帧有 100 个与输出数据帧相同的分区

【问题讨论】:

【参考方案1】:

简答:

这是因为你的数据,而不是因为 spark。

长答案:

为了执行join 操作,spark 需要将具有相同键(您要加入的列的值)的数据移动到相同的工作人员。例如。如果您将 A 列与 B 列连接,则两个表中包含相同值的行将被移动到相同的工作人员,然后再连接。

此外 - 具有不同键的行也可能移动到同一个节点 - 这取决于您拥有的分区器。您可以阅读更多 here - 但一般的想法是默认分区器 - HashPartitionerRangePartitioner。不管使用哪一个 - 它决定了哪个工人行。例如 - 如果您有 RangePartitioner 范围为 [0, 5)[5. 7)[7, 10] 然后键 1, 2, 3, 4 将全部交给同一个工人。如果您的数据中只有这些键 - 将只使用一名工作人员。

【讨论】:

以上是关于Spark Dataframe Join shuffle的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame join,需要两列,怎么做

Spark Dataframe Join shuffle

Spark SQL中Dataframe join操作含null值的列

Spark PairRDDs 和 DataFrames 是不是被索引?

Spark SQL 之 Join 实现

Spark join和cogroup算子