如何在 Spark 中写入 CSV

Posted

技术标签:

【中文标题】如何在 Spark 中写入 CSV【英文标题】:How to write to CSV in Spark 【发布时间】:2014-06-25 00:41:47 【问题描述】:

我正在尝试找到一种将 Spark Job 的结果保存为 csv 文件的有效方法。我正在使用 Spark 和 Hadoop,到目前为止,我的所有文件都保存为 part-00000

任何想法如何使我的火花保存到具有指定文件名的文件?

【问题讨论】:

How to write the resulting RDD to a csv file in Spark python的可能重复 @gsamaras 考虑到时间,这个问题可能与这个问题重复:] 内容比时间更重要,但没关系。你提出了一个很好的问题,这就是为什么我也投了赞成票! :) Write single CSV file using spark-csv的可能重复 【参考方案1】:

由于 Spark 使用 Hadoop File System API 将数据写入文件,这是不可避免的。如果你这样做了

rdd.saveAsTextFile("foo")

它将保存为“foo/part-XXXXX”,在您尝试保存的 RDD 中的每个分区中都有一个 part-* 文件。将 RDD 中的每个分区写入一个单独的文件的原因是为了容错。如果写入第三个分区(即part-00002)的任务失败,Spark 只需重新运行任务并覆盖部分写入/损坏的part-00002,对其他部分没有影响。如果它们都写入同一个文件,那么恢复单个任务失败的难度要大得多。

part-XXXXX 文件通常不是问题,如果您打算在基于 Spark / Hadoop 的框架中再次使用它,因为它们都使用 HDFS API,如果您要求它们读取“foo”,它们都会读取foo 中的所有part-XXXXX 文件也是如此。

【讨论】:

谢谢你,这是非常好的解释,完全回答了我的问题。 然后您可以使用 hdfs 合并命令将它们打包到一个文件中:hdfs dfs -getmerge <src-directory> <dst-file> 如果结果数据很小并且单个输出文件很方便,您可以随时使用repartition(1) 将数据重新分区到单个输出文件中。这只适用于小数据,例如,当您想将 CSV 交给分析师以在 Excel 中查看时。 也可以查看coalesce(1) 答案@MFARID 保存了一个随机播放步骤。【参考方案2】:

我建议这样做(Java 示例):

theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
    fs, new Path(textFileName),
    fs, new Path(textFileNameDestiny),
    true, fs.getConf(), null);

【讨论】:

很好的答案,包括所需的详细信息。【参考方案3】:

将 Tathagata Das 答案扩展到 Spark 2.x 和 Scala 2.11

使用 Spark SQL,我们可以在一个班轮中做到这一点

//implicits for magic functions like .toDf
import spark.implicits._

val df = Seq(
  ("first", 2.0),
  ("choose", 7.0),
  ("test", 1.5)
).toDF("name", "vals")

//write DataFrame/DataSet to external storage
df.write
  .format("csv")
  .save("csv/file/location")

然后你可以继续adoalonso的回答。

【讨论】:

【参考方案4】:

我有一个想法,但没有准备好代码 sn-p。 Spark 在内部(顾名思义)使用 Hadoop 输出格式。 (以及从 HDFS 读取时的InputFormat)。

在hadoop的FileOutputFormat中有一个受保护的成员setOutputFormat,你可以从继承的类中调用它来设置其他的基名。

【讨论】:

【参考方案5】:

这并不是一个真正干净的解决方案,但在 foreachRDD() 中你基本上可以做任何你喜欢的事情,也可以创建一个新文件。

在我的解决方案中,这就是我所做的:我将输出保存在 HDFS 上(出于容错原因),并在 foreachRDD 内我还在本地文件夹中创建了一个带有统计信息的 TSV 文件。

如果你需要的话,我认为你也可以这样做。

http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations

【讨论】:

以上是关于如何在 Spark 中写入 CSV的主要内容,如果未能解决你的问题,请参考以下文章

如何在Spark Scala中以CSV格式编写不同的布局

如何在写入hive orc表时合并spark中的小文件

在pyspark(2.2.0)中将CSV文件写入AWS时如何分配访问控制列表(ACL)?

无法使用 spark(sqlContext) 在 aws redshift 中写入 csv 数据

无法在 Spark 中将文件写入 Zeppelin 上的远程 hdfs

在 R 和 Sparklyr 中,将表写入 .CSV (spark_write_csv) 会产生许多文件,而不是一个文件。为啥?我可以改变吗?