让 PySpark 每列值输出一个文件(重新分区/分区不工作)

Posted

技术标签:

【中文标题】让 PySpark 每列值输出一个文件(重新分区/分区不工作)【英文标题】:Get PySpark to output one file per column value (repartition / partitionBy not working) 【发布时间】:2021-09-24 19:36:16 【问题描述】:

我看到很多答案和 blob 帖子表明:

df.repartition('category').write().partitionBy('category')

将为每个类别输出一个文件,但如果 df 中唯一“类别”值的数量小于默认分区的数量(通常为 200),则这似乎不是真的。

当我在具有 100 个类别的文件上使用上述代码时,我最终会得到 100 个文件夹,每个文件夹包含 1 到 3 个“部分”文件,而不是在同一个“部分”中包含具有给定“类别”值的所有行”。 https://***.com/a/42780452/529618 的答案似乎可以解释这一点。

每个分区值只获取一个文件的最快方法是什么?


我尝试过的事情

我见过很多这样的说法

df.repartition(1, 'category').write().partitionBy('category')
df.repartition(2, 'category').write().partitionBy('category')

将分别创建“每个类别恰好一个文件”和“每个类别恰好两个文件”,但这似乎不是此参数的工作方式。 documentation 清楚地表明numPartitions 参数是要创建的分区总数,而不是每列值的分区数。根据该文档,将此参数指定为 1 应该(意外地)在写入文件时为每个分区输出一个文件,但可能只是因为它删除了所有并行性并强制您的整个 RDD 在单个节点上进行混洗/重新计算。

required_partitions = df.select('category').distinct().count()
df.repartition(required_partitions, 'category').write().partitionBy('category')

以上似乎是一种基于记录行为的解决方法,但由于多种原因,这种方法代价高昂。一方面,如果 df 昂贵且未缓存(和/或太大以至于仅为此目的缓存将是浪费的),则单独计数,并且数据帧的任何重新分区都可能导致多阶段工作流程中不必要的改组一路上有各种数据帧输出。

【问题讨论】:

【参考方案1】:

“最快”的方式可能取决于实际的硬件设置和实际数据(以防出现偏差)。据我所知,我也同意df.repartition('category').write().partitionBy('category') 无助于解决您的问题。

我们在应用程序中遇到了类似的问题,但我们没有先进行计数,然后再进行重新分区,而是将数据的写入和每个分区只有一个文件的要求分开到两个不同的 Spark 作业中。第一个作业被优化为写入数据。第二个作业只是遍历分区文件夹结构并简单地读取每个文件夹/分区的数据,将其数据合并到一个分区并将它们覆盖回来。同样,我不知道这是否也是您环境中最快的方法,但对我们来说它成功了。

在对该主题进行了一些研究后,Databricks 上的Auto Optimize Writes 功能可用于写入增量表。在这里,他们使用了类似的方法:首先写入数据,然后运行单独的 OPTIMIZE 作业以将文件聚合到单个文件中。在提到的链接中,您会找到以下解释:

“在单个写入之后,Azure Databricks 会检查文件是否可以进一步压缩,并运行 OPTIMIZE 作业 [...] 以进一步压缩具有最多小文件的分区的文件。”

附带说明:确保将配置 spark.sql.files.maxRecordsPerFile 保持为 0(默认值)或负数。否则,仅此配置可能会导致“类别”列中具有相同值的数据的多个文件。

【讨论】:

我认为这是一个很好的见解——让 spark 尽可能快地转储数据并创建一个单独的 spark 作业甚至是纯 python 任务来解决将结果文件合并/重命名为某物的问题更适合下游系统使用。【参考方案2】:

你可以试试coalesce(n); coalesce用于减少分区数,是repartition的优化版。

n = 要输出的分区数。

【讨论】:

我不清楚coalesce 在这种情况下如何提供帮助,我希望指定列中的每个值都有一个分区。您是否建议我遍历列中的所有不同值并将该值过滤到一个新的数据帧中,并将这些新数据帧中的每一个合并到一个分区?这似乎非常昂贵。

以上是关于让 PySpark 每列值输出一个文件(重新分区/分区不工作)的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 对 parquet 文件进行分区和重新分区

PySpark 重新分区 RDD 元素

Pyspark Parquet - 重新分区后排序

使用 pyspark 重新分区失败并出现错误

PySpark 根据特定列重新分区

SQL - 根据列值重新启动分区