Spark Sort Merge Join 是不是涉及洗牌阶段?
Posted
技术标签:
【中文标题】Spark Sort Merge Join 是不是涉及洗牌阶段?【英文标题】:Does Spark Sort Merge Join involve a shuffle phase?Spark Sort Merge Join 是否涉及洗牌阶段? 【发布时间】:2021-09-03 17:58:04 【问题描述】:我对 Sort Merge Join 是否在排序阶段之前涉及一个 shuffle 阶段感到有点困惑。有的文章说可以,但是为什么不叫Shuffle Sort Merge Join,和Shuffle Hash Join比较一致。
【问题讨论】:
【参考方案1】:TLDR:是的,Spark Sort Merge Join 涉及一个 shuffle 阶段。而且我们可以推测它之所以不叫Shuffle Sort Merge Join,是因为没有Broadcast Sort Merge Join可以区分。
用一个例子理解 Spark Sort Merge Join
Spark 的排序合并连接算法使用 shuffle 将数据分布在执行程序之间。让我们用一个例子来看看。
假设你想加入以下datasetA
:
id | value |
---|---|
3 | a3 |
1 | a1 |
4 | a4 |
2 | a2 |
有以下datasetB
:
id | value |
---|---|
2 | b2 |
4 | b4 |
3 | b3 |
1 | b1 |
为此,您在 2 个执行器上拥有一个 Spark 应用程序,并使用排序合并策略。让我们详细说明每个步骤。
1。您根据分区函数对数据进行洗牌
假设我们使用 模 2 分区函数。数据将在两个执行器上重新分配,如下所示:
执行者 1
执行器 1 获取 id 值为 1
模 2 的行,因此 id 为 1
和 3
数据集A
id | valueA |
---|---|
3 | a3 |
1 | a1 |
数据集B
id | valueB |
---|---|
3 | b3 |
1 | b1 |
执行者 2
执行器 2 获取 id 值为 0
模 2 的行,因此 id 为 2
和 4
数据集A
id | valueA |
---|---|
4 | a4 |
2 | a2 |
数据集B
id | valueB |
---|---|
2 | b2 |
4 | b4 |
2。您对每个执行器上的数据集进行排序
执行者 1
数据集A
id | valueA |
---|---|
1 | a1 |
3 | a3 |
数据集B
id | valueB |
---|---|
1 | b1 |
3 | b3 |
执行者 2
数据集A
id | valueA |
---|---|
2 | a2 |
4 | a4 |
数据集B
id | valueB |
---|---|
2 | b2 |
4 | b4 |
3。你执行加入
执行者 1
id | valueA | valueB |
---|---|---|
1 | a1 | b1 |
3 | a3 | b3 |
执行者 2
id | valueA | valueB |
---|---|---|
2 | a2 | b2 |
4 | a4 | b4 |
完成最终数据集
id | valueA | valueB |
---|---|---|
1 | a1 | b1 |
3 | a3 | b3 |
2 | a2 | b2 |
4 | a4 | b4 |
因此,当您使用排序合并策略时,您首先要打乱数据。
代码参数
如果您查看Spark's code,您会看到执行排序合并连接的类SortMergeJoinExec
扩展了特征ShuffledJoin
。所以代码告诉你在执行Sort Merge Join时有一个shuffle。
那为什么不叫 Shuffled Sort Merge Join 呢?
在经典的关系数据库管理系统中,由于一切都在同一个执行器/服务器/机器上完成,您只需要选择加入策略。主要的加入策略是:
hash join:最有效的一种,使用连接键的哈希来挑选匹配的行 sort merge join nested loop join:这是一种朴素的算法,遍历两个数据集的所有行,只匹配一个由于 Spark 是一个在多个执行器上运行的分布式计算框架,因此您首先必须将大数据集拆分为较小的部分,这些部分可以独立分布在将应用这些连接算法的每个执行器上。为此,您有两种策略:
广播:将最小的数据集复制到每个执行器。由于 executor 拥有最小数据集中的所有数据,它可以将其与最大数据集的一部分连接起来,而无需依赖其他 executor。 Spark 会在最小适合执行程序的内存时选择此策略。 shuffle/repartition:将两个数据集的部分复制到每个执行器。对于每个执行程序,您应该拥有两个数据集的正确部分,以防止在执行联接时必须从其他执行程序检索数据。所以当你要求 Spark 连接两个数据集时,Spark 需要选择两种策略:它如何在 executor 之间分配数据(广播或 shuffle)以及它如何执行实际连接(排序合并连接、哈希连接或嵌套循环连接)。这两种策略的组合给出了 Spark 的连接策略:
广播哈希加入 随机散列连接 广播嵌套循环连接 笛卡尔积 排序合并加入我们可以看到Hash Join是唯一结合了Broadcast和Shuffle两种不同分发策略的join策略。所以我们可以猜测添加了Shuffled
前缀是为了避免Hash Join with Broadcast和Hash Join with Shuffle之间的混淆。
因此我们可以想象,由于没有Broadcast Sort Merge Join,所以不需要将Shuffled
作为前缀,这就是Sort Merge Join策略不称为Shuffled Sort Merge Join的原因。
【讨论】:
以上是关于Spark Sort Merge Join 是不是涉及洗牌阶段?的主要内容,如果未能解决你的问题,请参考以下文章
Sort merge joinNested loopsHash join(三种连接类型)
SSIS中 merge join与lookup 哪个性能更好些