如何在唯一键上加入 DataFrame 时避免洗牌?
Posted
技术标签:
【中文标题】如何在唯一键上加入 DataFrame 时避免洗牌?【英文标题】:How to avoid shuffles while joining DataFrames on unique keys? 【发布时间】:2017-05-07 12:19:57 【问题描述】:我有两个 DataFrame A
和 B
:
A
的列 (id, info1, info2)
大约有 2 亿行
B
只有列 id
有 100 万行
id
列在两个 DataFrame 中都是唯一的。
我想要一个新的 DataFrame,它过滤 A
以仅包含来自 B
的值。
如果 B 非常小,我知道我会这样做
A.filter($("id") isin B("id"))
但B
仍然很大,所以不是所有的都可以作为广播变量。
我知道我可以使用
A.join(B, Seq("id"))
但这不会利用独特性,恐怕会导致不必要的洗牌。
完成该任务的最佳方法是什么?
【问题讨论】:
是什么让你觉得“我怕会造成不必要的洗牌”? 我相信 Spark 不会将所有的小数据帧存储到所有节点,导致它在加入时会随机播放。此外,如果 spark 知道唯一性,如果找到一个值,它可能会停止发送值。如果我错了,请纠正我。 听起来是对的,但考虑到所有 Spark 优化都是相当新的/年轻的并且不一定经过实战考验,请根据具体情况猜测。 【参考方案1】:如果您没有在 Dataframe A 上应用任何分区器,这可能会帮助您理解 Join 和 Shuffle 的概念。
没有分区器:
A.join(B, Seq("id"))
默认情况下,此操作将对两个数据帧的所有键进行散列,通过网络将具有相同键散列的元素发送到同一台机器,然后在该机器上将具有相同键的元素连接在一起。在这里,您必须注意到两个数据帧都在网络中随机播放。
使用 HashPartitioner: 在构建 Dataframe 时调用 partitionBy(),Spark 现在会知道它是散列分区的,并且调用 join() 将利用此信息。特别是,当我们调用 A.join(B, Seq("id")) 时,Spark 将只打乱 B RDD。由于 B 的数据比 A 少,因此您不需要在 B 上应用分区器
例如:
val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // Create 100 partitions
.persist()
A.join(B, Seq("id"))
参考来自 Learning Spark 书籍。
【讨论】:
partitionBy
的“成本”呢?它不会在join
之前造成洗牌,如此有效地成本(= 洗牌次数)将是相同的吗?
@JacekLaskowski 我同意 partitionBy 如果您在现有 RDD 上应用它会花费您,但是在这里当您构建 Dataframe/RDD 本身时,您正在应用 partitionBy,所以它不会花费您额外的费用。
那么,没有partitionBy
的“纯”join
与partitionBy
之间的区别是什么?没区别?
纯连接:默认相同键的数据会分裂到不同的分区,所以当你加入时,第一步是将所有具有相同键的数据移动到同一个分区。 join with partitionBy :当您创建数据框时,数据将以这样的方式分区,即相同的关键数据将成为同一分区的一部分。因此加入不需要将所有具有相同键的数据移动到同一个分区,这将为您节省更多的洗牌。
出于好奇,如果你这样做B.join(A, Seq("id"))
,spark 是否足够聪明,仍然只能洗牌 B RDD`?【参考方案2】:
我对如何优化连接的默认建议是:
如果可以,请使用广播联接(从您的问题看来,您的表很大,广播联接不是一种选择)。 Spark 中的一个选项是执行广播连接(在 hadoop 世界中也称为 map-side join)。使用广播连接,您可以非常有效地将大表(事实)与相对较小的表(维度)连接起来,避免通过网络发送大表的所有数据。
在连接运算符中使用时,您可以使用广播功能来标记要广播的数据集。它使用 spark.sql.autoBroadcastJoinThreshold 设置来控制执行连接时将广播到所有工作节点的表的大小。
使用相同的分区器。 如果两个 RDD 具有相同的 partitioner,则 join 不会导致 shuffle。但是请注意,缺少 shuffle 并不意味着不必在节点之间移动数据。两个 RDD 可能具有相同的分区器(共同分区),但相应的分区位于不同的节点上(不位于同一位置)。 这种情况仍然比洗牌要好,但要记住这一点。托管可以提高性能,但很难保证。
如果数据量很大和/或您的集群无法增长,以至于即使上述 (2) 导致 OOM,请使用两遍方法。首先,重新分区数据并使用分区表(dataframe.write.partitionBy())进行持久化。然后,在一个循环中串行连接子分区,“附加”到同一个最终结果表。
【讨论】:
我在我的问题中使用了广播,但是当我重新运行几次(测试不同的条件)时,我相信它会以某种方式卡在内存中。如何以可以重用以前的广播或只是覆盖它的方式进行操作?谢谢!【参考方案3】:如果我正确理解您的问题,您想使用广播连接,在每个节点上复制 DataFrame B
,以便进行半连接计算(即,使用连接从 DataFrame A
过滤 id
)可以在每个节点上独立计算,而不必在彼此之间来回传递信息(即随机连接)。
您可以运行显式调用广播连接的连接函数来实现您想要做的事情:
import org.apache.spark.sql.functions.broadcast
val joinExpr = A.col("id") === B.col("id")
val filtered_A = A.join(broadcast(B), joinExpr, "left_semi")
您可以运行filtered_A.explain()
来验证是否正在使用广播连接。
【讨论】:
以上是关于如何在唯一键上加入 DataFrame 时避免洗牌?的主要内容,如果未能解决你的问题,请参考以下文章