如何避免pyspark中加入操作中的过度洗牌?

Posted

技术标签:

【中文标题】如何避免pyspark中加入操作中的过度洗牌?【英文标题】:How to avoid excessive shuffles in join operation in pyspark? 【发布时间】:2020-05-11 11:13:11 【问题描述】:

我有一个大小约为 25 GB 的大型 spark 数据帧,我必须与另一个大小约为 15 GB 的数据帧连接。

现在,当我运行代码时,大约需要 15 分钟才能完成

资源分配是 40 个执行器,每个 128 GB 内存

当我完成它的执行计划时,正在执行排序合并连接。

问题是:

在相同的键但不同的表上执行连接大约 5 到 6 次,因为在为每个执行的连接合并/连接数据之前,它花费了大部分时间对数据进行排序和共同定位分区。

那么有没有办法在执行连接之前对数据进行排序,从而不对每个连接执行排序操作,或者以这样的方式进行优化,以减少排序时间和实际连接数据的时间?

我只想在执行连接之前对我的数据框进行排序,但不知道该怎么做?

例如:

如果我的数据框加入 id 列

joined_df = df1.join(df2,df1.id==df2.id)

如何在加入之前根据“id”对数据帧进行排序,以便分区位于同一位置?

【问题讨论】:

***.com/questions/42985178/… 您可以将它们转换为 rdd,然后在进行连接之前使用相同的分区器。 【参考方案1】:

那么有没有办法在执行连接之前对数据进行排序,从而不对每个连接执行排序操作,或者以这样的方式进行优化,以减少排序时间和实际连接数据的时间?

闻起来像桶。

分桶是一种优化技术,它使用桶(和分桶列)来确定数据分区并避免数据混洗。

这个想法是 bucketBy 数据集,以便 Spark 知道键位于同一位置(已经预洗牌)。参与连接的 DataFrame 中的桶数和桶列数必须相同。

请注意,Hive 或 Spark 表 (saveAsTable) 支持这一点,因为存储桶元数据是从元存储(Spark 或 Hive)中获取的。

【讨论】:

如果我们将数据保存在 s3 上,我们可以得到桶信息吗? 这是我的理解,因为存储桶信息位于元存储而不是文件系统中。 我的用例包括从 s3 读取数据,所以如果我能以某种方式排除过度洗牌。请帮忙。我浏览了你的大部分博客,它们都很棒。谢谢【参考方案2】:

过去,我通过连接列对输入数据帧进行重新分区,取得了不错的效果。虽然这并不能避免洗牌,但它确实使洗牌明确,允许您选择专门用于连接的分区数量(而不是设置spark.sql.shuffle.partitions,这将适用于所有连接)。

如果您需要在多个作业中多次读取数据集,而写入持久性存储的成本得到回报,则分桶是一种有用的技术。

【讨论】:

重新分区本身是一项昂贵的操作,但我会试一试...谢谢

以上是关于如何避免pyspark中加入操作中的过度洗牌?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中加入带有熊猫数据框的配置单元表?

在pyspark中加入2个表,多个条件,左连接?

将每个元素视为元组时,在 PySpark 中加入 2 个 RDD

在pyspark中加入具有相同列名的数据框

在 PySpark 中加入多个列

在 PySpark 中加入 270 列