如何避免pyspark中加入操作中的过度洗牌?
Posted
技术标签:
【中文标题】如何避免pyspark中加入操作中的过度洗牌?【英文标题】:How to avoid excessive shuffles in join operation in pyspark? 【发布时间】:2020-05-11 11:13:11 【问题描述】:我有一个大小约为 25 GB 的大型 spark 数据帧,我必须与另一个大小约为 15 GB 的数据帧连接。
现在,当我运行代码时,大约需要 15 分钟才能完成
资源分配是 40 个执行器,每个 128 GB 内存
当我完成它的执行计划时,正在执行排序合并连接。
问题是:
在相同的键但不同的表上执行连接大约 5 到 6 次,因为在为每个执行的连接合并/连接数据之前,它花费了大部分时间对数据进行排序和共同定位分区。
那么有没有办法在执行连接之前对数据进行排序,从而不对每个连接执行排序操作,或者以这样的方式进行优化,以减少排序时间和实际连接数据的时间?
我只想在执行连接之前对我的数据框进行排序,但不知道该怎么做?
例如:
如果我的数据框加入 id 列
joined_df = df1.join(df2,df1.id==df2.id)
如何在加入之前根据“id”对数据帧进行排序,以便分区位于同一位置?
【问题讨论】:
***.com/questions/42985178/… 您可以将它们转换为 rdd,然后在进行连接之前使用相同的分区器。 【参考方案1】:那么有没有办法在执行连接之前对数据进行排序,从而不对每个连接执行排序操作,或者以这样的方式进行优化,以减少排序时间和实际连接数据的时间?
闻起来像桶。
分桶是一种优化技术,它使用桶(和分桶列)来确定数据分区并避免数据混洗。
这个想法是 bucketBy
数据集,以便 Spark 知道键位于同一位置(已经预洗牌)。参与连接的 DataFrame 中的桶数和桶列数必须相同。
请注意,Hive 或 Spark 表 (saveAsTable
) 支持这一点,因为存储桶元数据是从元存储(Spark 或 Hive)中获取的。
【讨论】:
如果我们将数据保存在 s3 上,我们可以得到桶信息吗? 这是我的理解,因为存储桶信息位于元存储而不是文件系统中。 我的用例包括从 s3 读取数据,所以如果我能以某种方式排除过度洗牌。请帮忙。我浏览了你的大部分博客,它们都很棒。谢谢【参考方案2】:过去,我通过连接列对输入数据帧进行重新分区,取得了不错的效果。虽然这并不能避免洗牌,但它确实使洗牌明确,允许您选择专门用于连接的分区数量(而不是设置spark.sql.shuffle.partitions
,这将适用于所有连接)。
如果您需要在多个作业中多次读取数据集,而写入持久性存储的成本得到回报,则分桶是一种有用的技术。
【讨论】:
重新分区本身是一项昂贵的操作,但我会试一试...谢谢以上是关于如何避免pyspark中加入操作中的过度洗牌?的主要内容,如果未能解决你的问题,请参考以下文章