使用 pyspark 对 parquet 文件进行分区和重新分区
Posted
技术标签:
【中文标题】使用 pyspark 对 parquet 文件进行分区和重新分区【英文标题】:partitioning and re-partittioning parquet files using pyspark 【发布时间】:2020-07-14 01:58:38 【问题描述】:我有一个正在尝试解决的镶木地板分区问题。我在这个网站和网络上阅读了很多关于分区的材料,但仍然无法解释我的问题。
第 1 步:我有一个大型数据集 (~2TB),其中包含 MODULE
和 DATE
列,并由由 86 days
组成的 DATE
分区。每个DATE
分区都有21
文件,因此总共有86 * 21 = 1806
文件。
第 2 步:我需要根据 MODULE
列聚合数据,因此我将其加载保存为另一个由 MODULE
分区的镶木地板。有9
模块,每个模块都有来自所有86
天的数据,因此生成的镶木地板有9 * 1806 = 16254
文件。
第 3 步 我通过 for 循环加载了每个 MODULE
分区,执行了我的聚合,并将其保存为附加模式的文件夹,这样我就有 9 个模块作为文件夹:@987654334 @、s3://path/MODULE B
等。
它们不按模块分区,而是保存为文件夹。由于我的默认 spark numpartitions 是 201
,因此每个模块文件夹都有 201
文件,因此总共有 9 * 201 = 1809
文件
第 4 步 到目前为止一切顺利,但我需要将其按DATE
重新分区。所以我遍历了每个MODULE
分区并将文件保存为一个没有任何分区的parquet 文件。这导致总共有2751
个文件。我不知道这是怎么计算出来的。
第 5 步 然后我加载了整个未分区的文件并将其保存为 DATE
分区。这导致了大约39k
文件,每个文件大约1.5MB。所以我有大量的小文件,加载镶木地板或对它们进行任何类型的操作(例如groupBy
等)需要很长时间。
在阅读了更多内容后,我尝试在 步骤 4 中使用repartition(1).partitionBy('DATE')
来减少文件数量,但最终失败了。
从 第 4 步 开始,我就知道自己做错了。有没有更有效的方法来完成整个事情?
谢谢
【问题讨论】:
既然你知道你86 days
的数据,你为什么不能在第4步把它重新分区为86,也许像df.repartition(86, "DATE")
与pyspark无关
@thebluephantom 我添加了 pyspark 标记,因为我在 pyspark 中进行编码,以便在 pyspark 中提供任何潜在的解决方案。 @ SomeshwarKale,我尝试这样做并在作业运行超过 8 小时左右后不断得到失败的结果。关于我做错了什么的任何见解?
这是一个轻微的下降,因为它是整个框架 afaik 的共同点
【参考方案1】:
找到正确数量的分区是您的关注点。
假设您有 86 天的数据,并且您想按日期分区保存它。然后你应该知道要在一个分区下创建多少个文件。
假设您每个日期有 3 GB 数据,那么您可能希望每个日期文件夹中至少有 6 个文件。
你可以做到这一点
df.repartition(6,'date').write.partitionBy('date')...
现在,如果您想限制每个文件中的记录数,请使用该属性
df.repartition(6, 'date').write.option("maxRecordsPerFile", 10000).partitionBy('date')...
【讨论】:
是repartition(col, numpartitions)
还是repartition(numpartitions, col)
?
我试过了,但运行了大约 8 小时后它失败了。失败原因是:Stage cancelled because SparkContext was shut down
。是不是每个文件都太大了?
尝试增加每个分区中的文件数,因为这将允许数据分散在多个工作人员上以上是关于使用 pyspark 对 parquet 文件进行分区和重新分区的主要内容,如果未能解决你的问题,请参考以下文章
在 parquet 文件顶部从 pyspark 代码创建一个表
pyspark write.parquet() 创建一个文件夹而不是 parquet 文件