有没有办法更改每个分区文件夹的输出行数?
Posted
技术标签:
【中文标题】有没有办法更改每个分区文件夹的输出行数?【英文标题】:Is there a way to change the number of output rows PER partition folder? 【发布时间】:2018-12-04 16:09:56 【问题描述】:我有大量最终结果数据,这些数据在我感兴趣的列中分布不均。当我直接通过分区写入时,每个分区的文件数与 spark.sql.shuffle.partitions 相同。这导致拥挤分区中的每个文件都非常大(以 GB 为单位),但在其他一些分区中,文件大小非常小(甚至以 KB 为单位)。有没有办法改变每个分区的文件数?
例子:
+----------------------------+----------+
| number of rows in category | category |
+----------------------------+----------+
| 50000000000 | A |
| 200000 | B |
| 30000 | C |
+----------------------------+----------+
如果我这样做:
df.write.partitionBy("category").parquet(output_dir)
文件夹“A”中的文件很大,而“B”和“C”中的文件很小。
【问题讨论】:
【参考方案1】:尝试使用多列重新分区数据框(如果可能并且对您的数据合乎逻辑)。
例子:
df.repartition("category", "<some_other_column_name>").write.partitionBy("category").parquet(output_dir)
【讨论】:
【参考方案2】:我建议在数据帧上调用 df.repartition(NUM_PARTITIONS)
以在分区上均匀分布行。在您的情况下,对于类别 = A,与类别 C 相比,行将分布在更多数量的分区上。重新分区后,当您为类别 A 调用 write.partitionBy("category")
时,因为它分布在更多分区上,所以更多数量的文件将被写入(每个 A 类分区一个文件)。
NUM_PARTITIONS 可以是动态的,例如 NUM_PARTITIONS = df.count()%ROWS_PER_PARTITION
。您可以根据每行的字节大小来决定多少 ROWS_PER_PARTITION。
NUM_PARTITIONS = 100
df.repartition(NUM_PARTITIONS).write.partitionBy("category").parquet(output_dir)
如果要查看分区是如何分布的,可以使用这个
import pyspark.sql.functions as f
df.withColumn("partition_id",f.spark_partition_id()).groupBy("partition_id").count().show()
更详细的讨论,你看这个Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?
【讨论】:
如果我错了,请纠正我,但我相信您的回答表明我可以通过数据框中的行数来更改分区数。所以如果我有大数据框,它可以动态增加。但是每个类别怎么可能有不同数量的分区呢?例如,我想要 A 类 10 个分区和 B 类 2 个分区。 无法按列值指定分区数。但是它的重新分区的副作用。如果 Category=A 的行太大而无法放入单个分区,则会溢出到另一个分区。而较小的计数将适合单个分区。 我试过了,但没有帮助。我相信这是因为到达组中有足够的行数,因此每个分区至少有来自每个类别的一行。对于最小的 C 组,30000 对 100 个分区... 尝试同时指定分区数和类别列。 df.repartition(NUM_PARTITIONS, "category")以上是关于有没有办法更改每个分区文件夹的输出行数?的主要内容,如果未能解决你的问题,请参考以下文章
有没有办法在 Xcode 9 中更改每个方案的 .xcconfig 文件?