pyspark数据帧到HDFS保存太多文件[重复]

Posted

技术标签:

【中文标题】pyspark数据帧到HDFS保存太多文件[重复]【英文标题】:pyspark dataframe to HDFS saving too many files [duplicate] 【发布时间】:2018-03-19 21:52:07 【问题描述】:

我正在汇总数据并希望将结果保存在 HDFS 中。我的最终结果只有 6 行 2 列的数据。但是,当我将其保存到 HDFS 时,它会保存 200 多个文件;我假设这是某种类型的预处理。当我查看文件时,它们也都是空白的。

results = aggregate.filter(aggregate["count"] > 2500)

results.show()
+--------------+-----+
|          c_ip|count|
+--------------+-----+
| 198.51.100.61| 2619|
|  203.0.113.33| 2635|
|198.51.100.211| 2668|
|198.51.100.121| 2723|
|198.51.100.176| 2518|
| 198.51.100.16| 2546|
+--------------+-----+

results.write.format("csv").save("/sparkcourse/results")

如何保存到 HDFS,以便获得一个包含这些结果的文件?显然,这些数据适合一个文件。

我尝试的另一件事是使用 .collect() 但随后它将我的数据变成了一个列表,并且无法使用该选项将任何内容放入 HDFS。

results = aggregate.filter(aggregate["count"] > 2500).collect()

【问题讨论】:

【参考方案1】:

Spark 只要在处理过程中出现随机播放,就会将数据重新分区为 200 个分区只要需要将数据从一个节点传输到另一个节点或在执行器之间传输,就会发生混洗。因此,当您保存数据帧(已经有​​ 200 个分区)时,会为每个分区创建 200 个文件并写入一些元数据文件。

所以解决您的问题的方法是使用coalesce(1) 函数,以便一个工作节点写入this answer 中提到的输出路径

results.coalesce(1).write.format("csv").save("/sparkcourse/results")

或者你可以使用repartition(1)作为

results.repartition(1).write.format("csv").save("/sparkcourse/results")

【讨论】:

很好的解释!我将 .collect() 与 .coalesce 和 .repartition(1) 混为一谈。我认为,当我执行 .collect() 时,它会将所有内容收集到一个节点,但最终只是将我的结果变成了一个列表。 collect() 将驱动节点中的所有数据累积为一个集合。

以上是关于pyspark数据帧到HDFS保存太多文件[重复]的主要内容,如果未能解决你的问题,请参考以下文章

从 Pyspark 在 HDFS 中保存文件

将数据作为文本文件从 spark 保存到 hdfs

在 pyspark 中执行 NLTK

如何使用 pyspark 读取 hdfs kafka 数据?

将内部文件转换为数据帧到另一个数据帧或 RDD

如何将存储在 HDFS 中包含行的文本文件转换为 Pyspark 中的数据框?