使用 pyspark 对 parquet 文件进行分区和重新分区

Posted

技术标签:

【中文标题】使用 pyspark 对 parquet 文件进行分区和重新分区【英文标题】:partitioning and re-partittioning parquet files using pyspark 【发布时间】:2020-07-14 01:58:38 【问题描述】:

我有一个正在尝试解决的镶木地板分区问题。我在这个网站和网络上阅读了很多关于分区的材料,但仍然无法解释我的问题。

第 1 步:我有一个大型数据集 (~2TB),其中包含 MODULEDATE 列,并由由 86 days 组成的 DATE 分区。每个DATE 分区都有21 文件,因此总共有86 * 21 = 1806 文件。

第 2 步:我需要根据 MODULE 列聚合数据,因此我将其加载保存为另一个由 MODULE 分区的镶木地板。有9 模块,每个模块都有来自所有86 天的数据,因此生成的镶木地板有9 * 1806 = 16254 文件。

第 3 步 我通过 for 循环加载了每个 MODULE 分区,执行了我的聚合,并将其保存为附加模式的文件夹,这样我就有 9 个模块作为文件夹:@987654334 @、s3://path/MODULE B 等。 它们不按模块分区,而是保存为文件夹。由于我的默认 spark numpartitions 是 201,因此每个模块文件夹都有 201 文件,因此总共有 9 * 201 = 1809 文件

第 4 步 到目前为止一切顺利,但我需要将其按DATE 重新分区。所以我遍历了每个MODULE 分区并将文件保存为一个没有任何分区的parquet 文件。这导致总共有2751 个文件。我不知道这是怎么计算出来的。

第 5 步 然后我加载了整个未分区的文件并将其保存为 DATE 分区。这导致了大约39k 文件,每个文件大约1.5MB。所以我有大量的小文件,加载镶木地板或对它们进行任何类型的操作(例如groupBy 等)需要很长时间。

在阅读了更多内容后,我尝试在 步骤 4 中使用repartition(1).partitionBy('DATE') 来减少文件数量,但最终失败了。 从 第 4 步 开始,我就知道自己做错了。有没有更有效的方法来完成整个事情?

谢谢

【问题讨论】:

既然你知道你86 days的数据,你为什么不能在第4步把它重新分区为86,也许像df.repartition(86, "DATE") 与pyspark无关 @thebluephantom 我添加了 pyspark 标记,因为我在 pyspark 中进行编码,以便在 pyspark 中提供任何潜在的解决方案。 @ SomeshwarKale,我尝试这样做并在作业运行超过 8 小时左右后不断得到失败的结果。关于我做错了什么的任何见解? 这是一个轻微的下降,因为它是整个框架 afaik 的共同点 【参考方案1】:

找到正确数量的分区是您的关注点。

假设您有 86 天的数据,并且您想按日期分区保存它。然后你应该知道要在一个分区下创建多少个文件。

假设您每个日期有 3 GB 数据,那么您可能希望每个日期文件夹中至少有 6 个文件。

你可以做到这一点

df.repartition(6,'date').write.partitionBy('date')...

现在,如果您想限制每个文件中的记录数,请使用该属性

df.repartition(6, 'date').write.option("maxRecordsPerFile", 10000).partitionBy('date')...

【讨论】:

repartition(col, numpartitions) 还是repartition(numpartitions, col) 我试过了,但运行了大约 8 小时后它失败了。失败原因是:Stage cancelled because SparkContext was shut down。是不是每个文件都太大了? 尝试增加每个分区中的文件数,因为这将允许数据分散在多个工作人员上

以上是关于使用 pyspark 对 parquet 文件进行分区和重新分区的主要内容,如果未能解决你的问题,请参考以下文章

在 parquet 文件顶部从 pyspark 代码创建一个表

pyspark write.parquet() 创建一个文件夹而不是 parquet 文件

Parquet 文件上 groupby 的最佳实践

Pyspark:无法从 SparkFiles 读取镶木地板文件

读取 PySpark 中的所有分区 parquet 文件

Pyspark Parquet - 重新分区后排序