Spark:按键重新分区输出

Posted

技术标签:

【中文标题】Spark:按键重新分区输出【英文标题】:Spark: repartition output by key 【发布时间】:2018-05-02 06:44:44 【问题描述】:

我正在尝试使用以下代码输出记录:

spark.createDataFrame(asRow, struct)
      .write
      .partitionBy("foo", "bar")
      .format("text")
      .save("/some/output-path")

数据很小的时候我没有问题。但是,当我处理 ~600GB 输入时,我正在编写大约 290k 个文件,其中包括每个分区的小文件。有没有办法控制每个分区的输出文件数量?因为我现在写了很多小文件,效果不好。

【问题讨论】:

你使用HDFS作为文件系统吗?如果是这样,您可以像这样合并文本:***.com/questions/42433869/…. 你可以看看这里(它适用于镶木地板,但应该适用于所有格式):***.com/questions/34789604/… 非常感谢@wind 和 Shaido 的回答。我正在用我的输入进行很多转换,这就是为什么我需要在 Spark 中编写和执行它。我现在面临的问题是每个分区有很多小文件。我们的 HDFS 集群的块大小为 128MB,因此最好每个分区 1 个文件接近或大于块大小。但目前我不知道是否有可用的数据框功能来做到这一点。 【参考方案1】:

拥有大量文件是预期的行为,因为每个分区(导致您在写入之前进行的任何计算)都会写入您请求相关文件的分区

如果您希望避免在写入之前需要重新分区:

spark.createDataFrame(asRow, struct)
      .repartition("foo","bar")
      .write
      .partitionBy("foo", "bar")
      .format("text")
      .save("/some/output-path")

【讨论】:

【参考方案2】:

每个分区有多个文件,因为每个节点都将输出写入自己的文件。这意味着每个分区只有一个文件的唯一方法是在写入之前重新分区数据。请注意,这将非常低效,因为数据重新分区会导致您的数据打乱。

【讨论】:

以上是关于Spark:按键重新分区输出的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame重新分区:未保留的分区数

Spark中的最佳重新分区方式

为啥在 Spark 中重新分区比 partitionBy 快?

Spark 重新分区执行器

Spark:持久化和重新分区顺序

我们如何在 Apache Spark 中执行动态重新分区?