有没有办法更改每个分区文件夹的输出行数?

Posted

技术标签:

【中文标题】有没有办法更改每个分区文件夹的输出行数?【英文标题】:Is there a way to change the number of output rows PER partition folder? 【发布时间】:2018-12-04 16:09:56 【问题描述】:

我有大量最终结果数据,这些数据在我感兴趣的列中分布不均。当我直接通过分区写入时,每个分区的文件数与 spark.sql.shuffle.partitions 相同。这导致拥挤分区中的每个文件都非常大(以 GB 为单位),但在其他一些分区中,文件大小非常小(甚至以 KB 为单位)。有没有办法改变每个分区的文件数?

例子:

+----------------------------+----------+
| number of rows in category | category |
+----------------------------+----------+
| 50000000000                |    A     |
| 200000                     |    B     |
| 30000                      |    C     |
+----------------------------+----------+

如果我这样做:

df.write.partitionBy("category").parquet(output_dir)

文件夹“A”中的文件很大,而“B”和“C”中的文件很小。

【问题讨论】:

【参考方案1】:

尝试使用多列重新分区数据框(如果可能并且对您的数据合乎逻辑)。

例子:

df.repartition("category", "<some_other_column_name>").write.partitionBy("category").parquet(output_dir)

【讨论】:

【参考方案2】:

我建议在数据帧上调用 df.repartition(NUM_PARTITIONS) 以在分区上均匀分布行。在您的情况下,对于类别 = A,与类别 C 相比,行将分布在更多数量的分区上。重新分区后,当您为类别 A 调用 write.partitionBy("category") 时,因为它分布在更多分区上,所以更多数量的文件将被写入(每个 A 类分区一个文件)。

NUM_PARTITIONS 可以是动态的,例如 NUM_PARTITIONS = df.count()%ROWS_PER_PARTITION。您可以根据每行的字节大小来决定多少 ROWS_PER_PARTITION。

NUM_PARTITIONS = 100 
df.repartition(NUM_PARTITIONS).write.partitionBy("category").parquet(output_dir)

如果要查看分区是如何分布的,可以使用这个

import pyspark.sql.functions as f
df.withColumn("partition_id",f.spark_partition_id()).groupBy("partition_id").count().show()

更详细的讨论,你看这个Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?

【讨论】:

如果我错了,请纠正我,但我相信您的回答表明我可以通过数据框中的行数来更改分区数。所以如果我有大数据框,它可以动态增加。但是每个类别怎么可能有不同数量的分区呢?例如,我想要 A 类 10 个分区和 B 类 2 个分区。 无法按列值指定分区数。但是它的重新分区的副作用。如果 Category=A 的行太大而无法放入单个分区,则会溢出到另一个分区。而较小的计数将适合单个分区。 我试过了,但没有帮助。我相信这是因为到达组中有足够的行数,因此每个分区至少有来自每个类别的一行。对于最小的 C 组,30000 对 100 个分区... 尝试同时指定分区数和类别列。 df.repartition(NUM_PARTITIONS, "category")

以上是关于有没有办法更改每个分区文件夹的输出行数?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法在 Xcode 9 中更改每个方案的 .xcconfig 文件?

python把数据写入文件,规定每个文件只有固定行数

有没有办法使用 Perl 脚本更改 Windows 文件夹图标?

从非分区表创建分区 hive 表

linux head

在 PySpark 中读取文本文件时有没有办法控制分区数