pyspark数据帧到HDFS保存太多文件[重复]
Posted
技术标签:
【中文标题】pyspark数据帧到HDFS保存太多文件[重复]【英文标题】:pyspark dataframe to HDFS saving too many files [duplicate] 【发布时间】:2018-03-19 21:52:07 【问题描述】:我正在汇总数据并希望将结果保存在 HDFS 中。我的最终结果只有 6 行 2 列的数据。但是,当我将其保存到 HDFS 时,它会保存 200 多个文件;我假设这是某种类型的预处理。当我查看文件时,它们也都是空白的。
results = aggregate.filter(aggregate["count"] > 2500)
results.show()
+--------------+-----+
| c_ip|count|
+--------------+-----+
| 198.51.100.61| 2619|
| 203.0.113.33| 2635|
|198.51.100.211| 2668|
|198.51.100.121| 2723|
|198.51.100.176| 2518|
| 198.51.100.16| 2546|
+--------------+-----+
results.write.format("csv").save("/sparkcourse/results")
如何保存到 HDFS,以便获得一个包含这些结果的文件?显然,这些数据适合一个文件。
我尝试的另一件事是使用 .collect() 但随后它将我的数据变成了一个列表,并且无法使用该选项将任何内容放入 HDFS。
results = aggregate.filter(aggregate["count"] > 2500).collect()
【问题讨论】:
【参考方案1】:Spark 只要在处理过程中出现随机播放,就会将数据重新分区为 200 个分区。 只要需要将数据从一个节点传输到另一个节点或在执行器之间传输,就会发生混洗。因此,当您保存数据帧(已经有 200 个分区)时,会为每个分区创建 200 个文件并写入一些元数据文件。
所以解决您的问题的方法是使用coalesce(1)
函数,以便一个工作节点写入this answer 中提到的输出路径
results.coalesce(1).write.format("csv").save("/sparkcourse/results")
或者你可以使用repartition(1)
作为
results.repartition(1).write.format("csv").save("/sparkcourse/results")
【讨论】:
很好的解释!我将 .collect() 与 .coalesce 和 .repartition(1) 混为一谈。我认为,当我执行 .collect() 时,它会将所有内容收集到一个节点,但最终只是将我的结果变成了一个列表。 collect() 将驱动节点中的所有数据累积为一个集合。以上是关于pyspark数据帧到HDFS保存太多文件[重复]的主要内容,如果未能解决你的问题,请参考以下文章