Spark Sort Merge Join 是不是涉及洗牌阶段?

Posted

技术标签:

【中文标题】Spark Sort Merge Join 是不是涉及洗牌阶段?【英文标题】:Does Spark Sort Merge Join involve a shuffle phase?Spark Sort Merge Join 是否涉及洗牌阶段? 【发布时间】:2021-09-03 17:58:04 【问题描述】:

我对 Sort Merge Join 是否在排序阶段之前涉及一个 shuffle 阶段感到有点困惑。有的文章说可以,但是为什么不叫Shuffle Sort Merge Join,和Shuffle Hash Join比较一致。

【问题讨论】:

【参考方案1】:

TLDR:是的,Spark Sort Merge Join 涉及一个 shuffle 阶段。而且我们可以推测它之所以不叫Shuffle Sort Merge Join,是因为没有Broadcast Sort Merge Join可以区分。

用一个例子理解 Spark Sort Merge Join

Spark 的排序合并连接算法使用 shuffle 将数据分布在执行程序之间。让我们用一个例子来看看。

假设你想加入以下datasetA

id value
3 a3
1 a1
4 a4
2 a2

有以下datasetB:

id value
2 b2
4 b4
3 b3
1 b1

为此,您在 2 个执行器上拥有一个 Spark 应用程序,并使用排序合并策略。让我们详细说明每个步骤。

1。您根据分区函数对数据进行洗牌

假设我们使用 模 2 分区函数。数据将在两个执行器上重新分配,如下所示:

执行者 1

执行器 1 获取 id 值为 1 模 2 的行,因此 id 为 13

数据集A
id valueA
3 a3
1 a1
数据集B
id valueB
3 b3
1 b1
执行者 2

执行器 2 获取 id 值为 0 模 2 的行,因此 id 为 24

数据集A
id valueA
4 a4
2 a2
数据集B
id valueB
2 b2
4 b4

2。您对每个执行器上的数据集进行排序

执行者 1
数据集A
id valueA
1 a1
3 a3
数据集B
id valueB
1 b1
3 b3
执行者 2
数据集A
id valueA
2 a2
4 a4
数据集B
id valueB
2 b2
4 b4

3。你执行加入

执行者 1
id valueA valueB
1 a1 b1
3 a3 b3
执行者 2
id valueA valueB
2 a2 b2
4 a4 b4

完成最终数据集

id valueA valueB
1 a1 b1
3 a3 b3
2 a2 b2
4 a4 b4

因此,当您使用排序合并策略时,您首先要打乱数据。

代码参数

如果您查看Spark's code,您会看到执行排序合并连接的类SortMergeJoinExec 扩展了特征ShuffledJoin。所以代码告诉你在执行Sort Merge Join时有一个shuffle。

那为什么不叫 Shuffled Sort Merge Join 呢?

在经典的关系数据库管理系统中,由于一切都在同一个执行器/服务器/机器上完成,您只需要选择加入策略。主要的加入策略是:

hash join:最有效的一种,使用连接键的哈希来挑选匹配的行 sort merge join nested loop join:这是一种朴素的算法,遍历两个数据集的所有行,只匹配一个

由于 Spark 是一个在多个执行器上运行的分布式计算框架,因此您首先必须将大数据集拆分为较小的部分,这些部分可以独立分布在将应用这些连接算法的每个执行器上。为此,您有两种策略:

广播:将最小的数据集复制到每个执行器。由于 executor 拥有最小数据集中的所有数据,它可以将其与最大数据集的一部分连接起来,而无需依赖其他 executor。 Spark 会在最小适合执行程序的内存时选择此策略。 shuffle/repartition:将两个数据集的部分复制到每个执行器。对于每个执行程序,您应该拥有两个数据集的正确部分,以防止在执行联接时必须从其他执行程序检索数据。

所以当你要求 Spark 连接两个数据集时,Spark 需要选择两种策略:它如何在 executor 之间分配数据(广播或 shuffle)以及它如何执行实际连接(排序合并连接、哈希连接或嵌套循环连接)。这两种策略的组合给出了 Spark 的连接策略:

广播哈希加入 随机散列连接 广播嵌套循环连接 笛卡尔积 排序合并加入

我们可以看到Hash Join是唯一结合了Broadcast和Shuffle两种不同分发策略的join策略。所以我们可以猜测添加了Shuffled前缀是为了避免Hash Join with Broadcast和Hash Join with Shuffle之间的混淆。

因此我们可以想象,由于没有Broadcast Sort Merge Join,所以不需要将Shuffled作为前缀,这就是Sort Merge Join策略不称为Shuffled Sort Merge Join的原因。

【讨论】:

以上是关于Spark Sort Merge Join 是不是涉及洗牌阶段?的主要内容,如果未能解决你的问题,请参考以下文章

Sort merge joinNested loopsHash join(三种连接类型)

11g sort merge join

SSIS中 merge join与lookup 哪个性能更好些

谁能真正理解hash join/nested loop/merge join

Python数据分析pands中的Merge与join

dplyr::left_join 是不是等同于 base::merge(..., all.x=TRUE)?