如何在唯一键上加入 DataFrame 时避免洗牌?

Posted

技术标签:

【中文标题】如何在唯一键上加入 DataFrame 时避免洗牌?【英文标题】:How to avoid shuffles while joining DataFrames on unique keys? 【发布时间】:2017-05-07 12:19:57 【问题描述】:

我有两个 DataFrame AB

A 的列 (id, info1, info2) 大约有 2 亿行 B 只有列 id 有 100 万行

id 列在两个 DataFrame 中都是唯一的。

我想要一个新的 DataFrame,它过滤 A 以仅包含来自 B 的值。

如果 B 非常小,我知道我会这样做

A.filter($("id") isin B("id"))

B 仍然很大,所以不是所有的都可以作为广播变量。

我知道我可以使用

A.join(B, Seq("id"))

但这不会利用独特性,恐怕会导致不必要的洗牌。

完成该任务的最佳方法是什么?

【问题讨论】:

是什么让你觉得“我怕会造成不必要的洗牌” 我相信 Spark 不会将所有的小数据帧存储到所有节点,导致它在加入时会随机播放。此外,如果 spark 知道唯一性,如果找到一个值,它可能会停止发送值。如果我错了,请纠正我。 听起来是对的,但考虑到所有 Spark 优化都是相当新的/年轻的并且不一定经过实战考验,请根据具体情况猜测。 【参考方案1】:

如果您没有在 Dataframe A 上应用任何分区器,这可能会帮助您理解 Join 和 Shuffle 的概念。

没有分区器:

A.join(B, Seq("id"))

默认情况下,此操作将对两个数据帧的所有键进行散列,通过网络将具有相同键散列的元素发送到同一台机器,然后在该机器上将具有相同键的元素连接在一起。在这里,您必须注意到两个数据帧都在网络中随机播放。

使用 HashPartitioner: 在构建 Dataframe 时调用 partitionBy(),Spark 现在会知道它是散列分区的,并且调用 join() 将利用此信息。特别是,当我们调用 A.join(B, Seq("id")) 时,Spark 将只打乱 B RDD。由于 B 的数据比 A 少,因此您不需要在 B 上应用分区器

例如:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))

参考来自 Learning Spark 书籍。

【讨论】:

partitionBy 的“成本”呢?它不会在join 之前造成洗牌,如此有效地成本(= 洗牌次数)将是相同的吗? @JacekLaskowski 我同意 partitionBy 如果您在现有 RDD 上应用它会花费您,但是在这里当您构建 Dataframe/RDD 本身时,您正在应用 partitionBy,所以它不会花费您额外的费用。 那么,没有partitionBy 的“纯”joinpartitionBy 之间的区别是什么?没区别? 纯连接:默认相同键的数据会分裂到不同的分区,所以当你加入时,第一步是将所有具有相同键的数据移动到同一个分区。 join with partitionBy :当您创建数据框时,数据将以这样的方式分区,即相同的关键数据将成为同一分区的一部分。因此加入不需要将所有具有相同键的数据移动到同一个分区,这将为您节省更多的洗牌。 出于好奇,如果你这样做B.join(A, Seq("id")),spark 是否足够聪明,仍然只能洗牌 B RDD`?【参考方案2】:

我对如何优化连接的默认建议是:

    如果可以,请使用广播联接(从您的问题看来,您的表很大,广播联接不是一种选择)。 Spark 中的一个选项是执行广播连接(在 hadoop 世界中也称为 map-side join)。使用广播连接,您可以非常有效地将大表(事实)与相对较小的表(维度)连接起来,避免通过网络发送大表的所有数据。

    在连接运算符中使用时,您可以使用广播功能来标记要广播的数据集。它使用 spark.sql.autoBroadcastJoinThreshold 设置来控制执行连接时将广播到所有工作节点的表的大小。

    使用相同的分区器。 如果两个 RDD 具有相同的 partitioner,则 join 不会导致 shuffle。但是请注意,缺少 shuffle 并不意味着不必在节点之间移动数据。两个 RDD 可能具有相同的分区器(共同分区),但相应的分区位于不同的节点上(不位于同一位置)。 这种情况仍然比洗牌要好,但要记住这一点。托管可以提高性能,但很难保证。

    如果数据量很大和/或您的集群无法增长,以至于即使上述 (2) 导致 OOM,请使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在一个循环中串行连接子分区,“附加”到同一个最终结果表。

【讨论】:

我在我的问题中使用了广播,但是当我重新运行几次(测试不同的条件)时,我相信它会以某种方式卡在内存中。如何以可以重用以前的广播或只是覆盖它的方式进行操作?谢谢!【参考方案3】:

如果我正确理解您的问题,您想使用广播连接,在每个节点上复制 DataFrame B,以便进行半连接计算(即,使用连接从 DataFrame A 过滤 id)可以在每个节点上独立计算,而不必在彼此之间来回传递信息(即随机连接)。

您可以运行显式调用广播连接的连接函数来实现您想要做的事情:

import org.apache.spark.sql.functions.broadcast

val joinExpr = A.col("id") === B.col("id")

val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")

您可以运行filtered_A.explain() 来验证是否正在使用广播连接。

【讨论】:

以上是关于如何在唯一键上加入 DataFrame 时避免洗牌?的主要内容,如果未能解决你的问题,请参考以下文章

如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌

如何避免pyspark中加入操作中的过度洗牌?

Bigtable:在行键上使用时间戳时避免热点

改组/排列熊猫中的DataFrame

在两个键上加入/合并两个查询集

如何将系列加入数据框?