如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌

Posted

技术标签:

【中文标题】如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌【英文标题】:How to physically partition data to avoid shuffle in Spark SQL joins 【发布时间】:2016-10-24 19:07:22 【问题描述】:

我需要加入 5 个中等大小的表(每个约 80 gb),输入数据约为 800 gb。所有数据都驻留在 HIVE 表中。 我正在使用 Spark SQL 1.6.1 来实现这一点。 加入需要 40 分钟才能完成 --num-executors 20 --driver-memory 40g --executor-memory 65g --executor-cores 6。所有连接都是排序合并外连接。也看到很多洗牌发生。

我将 hive 中的所有表分桶到相同数量的桶中,以便在首先加载数据本身时来自所有表的相似键将转到相同的 spark 分区。但似乎 spark 不理解分桶。

有没有其他方法我可以在 Hive 中对数据进行物理分区和排序(没有部分文件),以便 spark 在从 hive 本身加载数据时知道分区键,并在同一个分区中进行连接,而不需要对数据进行混洗?这将避免在从 hive 加载数据后进行额外的重新分区。

【问题讨论】:

【参考方案1】:

首先,Spark Sql 1.6.1 还不支持 hive 存储桶。 因此,在这种情况下,我们只剩下 Spark 级别的操作,以确保所有表在加载数据时都必须转到相同的 spark 分区。 Spark API 提供了 repartition 和 sortWithinPartitions 来实现相同的目的。例如

val part1 = df1.repartition(df1("key1")).sortWithinPartitions(df1("key1"))

以同样的方式,您可以为剩余的表进行几代分区,并将它们连接到在分区内排序的键上。

这将使连接“无随机播放”操作,但会带来大量计算成本。缓存数据帧(您可以对新创建的分区进行缓存操作)如果该操作将在后续时间执行,则性能会更好。希望这有帮助。

【讨论】:

以上是关于如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌的主要内容,如果未能解决你的问题,请参考以下文章

如何避免Spark SQL做数据导入时产生大量小文件

Sql优化之Mysql表分区

Apache Spark 使用 SQL 函数 nTile 对数据进行分区

Spark如何仅在分区内加入

图文理解 Spark 3.0 的动态分区裁剪优化

Spark 优化 | 图文理解 Spark 3.0 的动态分区裁剪优化