如何在 Spark 中写入 CSV
Posted
技术标签:
【中文标题】如何在 Spark 中写入 CSV【英文标题】:How to write to CSV in Spark 【发布时间】:2014-06-25 00:41:47 【问题描述】:我正在尝试找到一种将 Spark Job 的结果保存为 csv 文件的有效方法。我正在使用 Spark 和 Hadoop,到目前为止,我的所有文件都保存为 part-00000
。
任何想法如何使我的火花保存到具有指定文件名的文件?
【问题讨论】:
How to write the resulting RDD to a csv file in Spark python的可能重复 @gsamaras 考虑到时间,这个问题可能与这个问题重复:] 内容比时间更重要,但没关系。你提出了一个很好的问题,这就是为什么我也投了赞成票! :) Write single CSV file using spark-csv的可能重复 【参考方案1】:由于 Spark 使用 Hadoop File System API 将数据写入文件,这是不可避免的。如果你这样做了
rdd.saveAsTextFile("foo")
它将保存为“foo/part-XXXXX
”,在您尝试保存的 RDD 中的每个分区中都有一个 part-* 文件。将 RDD 中的每个分区写入一个单独的文件的原因是为了容错。如果写入第三个分区(即part-00002
)的任务失败,Spark 只需重新运行任务并覆盖部分写入/损坏的part-00002
,对其他部分没有影响。如果它们都写入同一个文件,那么恢复单个任务失败的难度要大得多。
part-XXXXX
文件通常不是问题,如果您打算在基于 Spark / Hadoop 的框架中再次使用它,因为它们都使用 HDFS API,如果您要求它们读取“foo”,它们都会读取foo 中的所有part-XXXXX
文件也是如此。
【讨论】:
谢谢你,这是非常好的解释,完全回答了我的问题。 然后您可以使用 hdfs 合并命令将它们打包到一个文件中:hdfs dfs -getmerge <src-directory> <dst-file>
如果结果数据很小并且单个输出文件很方便,您可以随时使用repartition(1)
将数据重新分区到单个输出文件中。这只适用于小数据,例如,当您想将 CSV 交给分析师以在 Excel 中查看时。
也可以查看coalesce(1)
答案@MFARID 保存了一个随机播放步骤。【参考方案2】:
我建议这样做(Java 示例):
theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
fs, new Path(textFileName),
fs, new Path(textFileNameDestiny),
true, fs.getConf(), null);
【讨论】:
很好的答案,包括所需的详细信息。【参考方案3】:将 Tathagata Das 答案扩展到 Spark 2.x 和 Scala 2.11
使用 Spark SQL,我们可以在一个班轮中做到这一点
//implicits for magic functions like .toDf
import spark.implicits._
val df = Seq(
("first", 2.0),
("choose", 7.0),
("test", 1.5)
).toDF("name", "vals")
//write DataFrame/DataSet to external storage
df.write
.format("csv")
.save("csv/file/location")
然后你可以继续adoalonso的回答。
【讨论】:
【参考方案4】:我有一个想法,但没有准备好代码 sn-p。 Spark 在内部(顾名思义)使用 Hadoop 输出格式。 (以及从 HDFS 读取时的InputFormat
)。
在hadoop的FileOutputFormat
中有一个受保护的成员setOutputFormat
,你可以从继承的类中调用它来设置其他的基名。
【讨论】:
【参考方案5】:这并不是一个真正干净的解决方案,但在 foreachRDD
() 中你基本上可以做任何你喜欢的事情,也可以创建一个新文件。
在我的解决方案中,这就是我所做的:我将输出保存在 HDFS 上(出于容错原因),并在 foreachRDD
内我还在本地文件夹中创建了一个带有统计信息的 TSV 文件。
如果你需要的话,我认为你也可以这样做。
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations
【讨论】:
以上是关于如何在 Spark 中写入 CSV的主要内容,如果未能解决你的问题,请参考以下文章
在pyspark(2.2.0)中将CSV文件写入AWS时如何分配访问控制列表(ACL)?
无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据
无法在 Spark 中将文件写入 Zeppelin 上的远程 hdfs
在 R 和 Sparklyr 中,将表写入 .CSV (spark_write_csv) 会产生许多文件,而不是一个文件。为啥?我可以改变吗?