使用 spark-csv 编写单个 CSV 文件

Posted

技术标签:

【中文标题】使用 spark-csv 编写单个 CSV 文件【英文标题】:Write single CSV file using spark-csv 【发布时间】:2015-10-18 21:41:51 【问题描述】:

我正在使用 https://github.com/databricks/spark-csv ,我正在尝试编写单个 CSV,但不能,它正在创建一个文件夹。

需要一个 Scala 函数,该函数将接受路径和文件名等参数并写入该 CSV 文件。

【问题讨论】:

【参考方案1】:

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果您需要单个输出文件(仍在文件夹中),您可以repartition(如果上游数据很大,则首选,但需要随机播放):

df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

coalesce:

df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

保存前的数据框:

所有数据都将写入mydata.csv/part-00000。在您使用此选项之前确保您了解正在发生的事情以及将所有数据传输给单个工作人员的成本是多少。如果您将分布式文件系统与复制一起使用,数据将被多次传输——首先获取到单个工作人员,然后分布在存储节点上。

或者,您可以保留您的代码,然后使用cat 或HDFS getmerge 等通用工具在之后简单地合并所有部分。

【讨论】:

你也可以使用合并:df.coalesce(1).write.format("com.databricks.spark.csv") .option("header", "true") .save(" mydata.csv") spark 1.6 在我们设置 .coalesce(1) 时会抛出错误,它会在 _temporary 目录上显示一些 FileNotFoundException。它仍然是 spark 中的一个错误:issues.apache.org/jira/browse/SPARK-2984 @Harsha 不太可能。而是coalesce(1) 非常昂贵且通常不实用的简单结果。 同意@zero323,但是如果您有特殊要求合并到一个文件中,考虑到您有足够的资源和时间,应该仍然可以。 @Harsha 我不是说没有。如果您正确调整 GC,它应该可以正常工作,但这只是浪费时间,而且很可能会损害整体性能。所以我个人认为没有任何理由打扰,特别是因为在 Spark 外部合并文件非常简单,根本不用担心内存使用。【参考方案2】:

如果您使用 HDFS 运行 Spark,我一直在通过正常写入 csv 文件并利用 HDFS 进行合并来解决问题。我直接在 Spark (1.6) 中这样做:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output



val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

不记得我在哪里学的这个技巧,但它可能对你有用。

【讨论】:

我没有尝试过 - 并且怀疑它可能不是直截了当的。 谢谢。我有 added an answer 在 Databricks 上工作 @Minkymorgan 我有类似的问题,但无法正确解决..你能看看这个问题***.com/questions/46812388/… @SUDARSHAN 我上面的函数适用于未压缩的数据。在您的示例中,我认为您在编写文件时使用了 gzip 压缩,然后尝试将这些文件合并在一起,但失败了。这是行不通的,因为您无法将 gzip 文件合并在一起。 Gzip 不是可拆分压缩算法,因此肯定不是“可合并的”。您可能会测试“snappy”或“bz2”压缩 - 但直觉是这在合并时也会失败。最好的方法可能是删除压缩,合并原始文件,然后使用可拆分的编解码器进行压缩。 如果我想保留标题怎么办?它为每个文件部分重复【参考方案3】:

我在这里玩游戏可能有点晚了,但是使用coalesce(1)repartition(1) 可能适用于小型数据集,但大型数据集将全部放入一个节点上的一个分区中。这可能会引发 OOM 错误,或者充其量是处理缓慢。

我强烈建议您使用 Hadoop API 中的 FileUtil.copyMerge() 函数。这会将输出合并到一个文件中。

EDIT - 这有效地将数据带到驱动程序而不是执行程序节点。 Coalesce() 如果单个 executor 的 RAM 比驱动程序多,那就没问题了。

编辑 2copyMerge() 在 Hadoop 3.0 中被删除。有关如何使用最新版本的更多信息,请参阅以下堆栈溢出文章:How to do CopyMerge in Hadoop 3.0?

【讨论】:

对如何以这种方式获取带有标题行的 csv 有任何想法吗?不想让文件产生一个标题,因为这会在整个文件中散布标题,每个分区一个。 这里有一个我过去使用过的选项:markhneedham.com/blog/2014/11/30/… @etspaceman 酷。不幸的是,我仍然没有真正的好方法来做到这一点,因为我需要能够在 Java(或 Spark,但以一种不消耗大量内存并且可以处理大文件的方式)中做到这一点.我仍然无法相信他们删除了这个 API 调用......这是一种非常常见的用法,即使 Hadoop 生态系统中的其他应用程序没有完全使用它。【参考方案4】:

如果您正在使用 Databricks 并且可以将所有数据放入一个工作人员的 RAM(因此可以使用 .coalesce(1)),您可以使用 dbfs 来查找并移动生成的 CSV 文件:

val fileprefix= "/mnt/aws/path/file-prefix"

dataset
  .coalesce(1)       
  .write             
//.mode("overwrite") // I usually don't use this, but you may want to.
  .option("header", "true")
  .option("delimiter","\t")
  .csv(fileprefix+".tmp")

val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
     .filter(file=>file.name.endsWith(".csv"))(0).path

dbutils.fs.cp(partition_path,fileprefix+".tab")

dbutils.fs.rm(fileprefix+".tmp",recurse=true)

如果您的文件不适合工作人员的 RAM,您可能需要考虑 chaotic3quilibrium's suggestion to use FileUtils.copyMerge()。我还没有这样做,也不知道是否可行,例如在 S3 上。

这个答案是建立在以前对这个问题的答案以及我自己对提供的代码 sn-p 的测试之上的。 I originally posted it to Databricks 并在这里重新发布。

我找到的关于 dbfs 的 rm 递归选项的最佳文档位于 a Databricks forum。

【讨论】:

【参考方案5】:

spark 的 df.write() API 将在给定路径内创建多个部分文件...强制 spark 只写入单个部分文件使用 df.coalesce(1).write.csv(...) 而不是 df.repartition(1).write.csv(...) 因为合并是一个狭窄的转换,而重新分区是一个广泛的转换参见Spark - repartition() vs coalesce()

df.coalesce(1).write.csv(filepath,header=True) 

将在给定的文件路径中创建包含一个part-0001-...-c000.csv 文件的文件夹 使用

cat filepath/part-0001-...-c000.csv > filename_you_want.csv 

要有一个用户友好的文件名

【讨论】:

或者,如果数据帧不太大(~GBs 或可以放入驱动程序内存),您也可以使用df.toPandas().to_csv(path) 这将使用您喜欢的文件名写入单个 csv 呃,这只能通过转换为 pandas 来完成,令人沮丧。只写一个没有 UUID 的文件有多难? 如何覆盖它?它适用于写入但无法覆盖【参考方案6】:

我在 Python 中使用它来获取单个文件:

df.toPandas().to_csv("/tmp/my.csv", sep=',', header=True, index=False)

【讨论】:

这可能有效,但它不是内存高效方法,因为驱动程序必须将 Spark Dataframe 转换为 pandas。因此,如果数据不是太大,这可能是一个好方法。 使用较小的数据,它就像一个魅力 :-D 并且您的文件不是奇怪的格式 :D【参考方案7】:

此答案扩展了已接受的答案,提供了更多上下文,并提供了可以在您机器上的 Spark Shell 中运行的代码 sn-ps。

有关已接受答案的更多上下文

接受的答案可能会给您这样的印象,示例代码输出单个 mydata.csv 文件,但事实并非如此。让我们演示一下:

val df = Seq("one", "two", "three").toDF("num")
df
  .repartition(1)
  .write.csv(sys.env("HOME")+ "/Documents/tmp/mydata.csv")

这是输出的内容:

Documents/
  tmp/
    mydata.csv/
      _SUCCESS
      part-00000-b3700504-e58b-4552-880b-e7b52c60157e-c000.csv

注意mydata.csv 是已接受答案中的文件夹 - 它不是文件!

如何输出具有特定名称的单个文件

我们可以使用spark-daria 写出一个mydata.csv 文件。

import com.github.mrpowers.spark.daria.sql.DariaWriters
DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = sys.env("HOME") + "/Documents/better/staging",
    filename = sys.env("HOME") + "/Documents/better/mydata.csv"
)

这将输出如下文件:

Documents/
  better/
    mydata.csv

S3 路径

您需要将 s3a 路径传递给 DariaWriters.writeSingleFile 才能在 S3 中使用此方法:

DariaWriters.writeSingleFile(
    df = df,
    format = "csv",
    sc = spark.sparkContext,
    tmpFolder = "s3a://bucket/data/src",
    filename = "s3a://bucket/data/dest/my_cool_file.csv"
)

请参阅here 了解更多信息。

避免复制合并

copyMerge 已从 Hadoop 3 中删除。DariaWriters.writeSingleFile 实现使用 fs.rename、as described here。 Spark 3 still used Hadoop 2,所以 copyMerge 实现将在 2020 年工作。我不确定 Spark 何时会升级到 Hadoop 3,但最好避免在 Spark 升级 Hadoop 时导致代码中断的任何 copyMerge 方法。

源代码

如果您想检查实现,请在 spark-daria 源代码中查找 DariaWriters 对象。

PySpark 实施

使用 PySpark 写出单个文件更容易,因为您可以将 DataFrame 转换为默认情况下作为单个文件写出的 Pandas DataFrame。

from pathlib import Path
home = str(Path.home())
data = [
    ("jellyfish", "JALYF"),
    ("li", "L"),
    ("luisa", "LAS"),
    (None, None)
]
df = spark.createDataFrame(data, ["word", "expected"])
df.toPandas().to_csv(home + "/Documents/tmp/mydata-from-pyspark.csv", sep=',', header=True, index=False)

限制

DariaWriters.writeSingleFile Scala 方法和df.toPandas() Python 方法仅适用于小型数据集。巨大的数据集不能写成单个文件。从性能的角度来看,将数据作为单个文件写入并不是最佳的,因为数据不能并行写入。

【讨论】:

嗨,1.0.0spark-daria 版本是否已发布到 maven repo?我没有看到它在那里可用。 @BandiKishore - 是的,这是链接:repo1.maven.org/maven2/com/github/mrpowers/spark-daria_2.12/…【参考方案8】:

适用于从 Minkymorgan 修改的 S3 的解决方案。

只需将临时分区目录路径(名称与最终路径不同)传递为 srcPath,将单个最终 csv/txt 传递为 destPath,如果要删除原始目录,还要指定 deleteSource

/**
* Merges multiple partitions of spark text file output into single file. 
* @param srcPath source directory of partitioned files
* @param dstPath output path of individual path
* @param deleteSource whether or not to delete source directory after merging
* @param spark sparkSession
*/
def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  
  import org.apache.hadoop.fs.FileUtil
  import java.net.URI
  val config = spark.sparkContext.hadoopConfiguration
  val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
  FileUtil.copyMerge(
    fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
  )

【讨论】:

copyMerge 实现列出所有文件并对其进行迭代,这在 s3 中是不安全的。如果您编写文件然后列出它们 - 这并不能保证将列出所有文件。见[这个|docs.aws.amazon.com/AmazonS3/latest/dev/… @LiranBo,对不起,为什么这不能保证它会起作用。引用链接文档“一个进程将一个新对象写入 Amazon S3 并立即列出其存储桶中的键。新对象将出现在列表中。” 现在,2020年12月1日之前,s3不保证写一致性后的list。现在可以了 - link【参考方案9】:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql.DataFrame,SaveMode,SparkSession
import org.apache.spark.sql.functions._

我使用以下方法解决了(hdfs 重命名文件名):-

第 1 步:-(创建数据帧并写入 HDFS)

df.coalesce(1).write.format("csv").option("header", "false").mode(SaveMode.Overwrite).save("/hdfsfolder/blah/")

第 2 步:-(创建 Hadoop 配置)

val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)

Step3 :- (获取hdfs文件夹路径)

val pathFiles = new Path("/hdfsfolder/blah/")

Step4:- (从 hdfs 文件夹中获取 spark 文件名)

val fileNames = hdfs.listFiles(pathFiles, false)
println(fileNames)

setp5:- (创建 scala 可变列表以保存所有文件名并将其添加到列表中)

    var fileNamesList = scala.collection.mutable.MutableList[String]()
    while (fileNames.hasNext) 
      fileNamesList += fileNames.next().getPath.getName
    
    println(fileNamesList)

第6步:-(从文件名scala列表中过滤_SUCESS文件顺序)

    // get files name which are not _SUCCESS
    val partFileName = fileNamesList.filterNot(filenames => filenames == "_SUCCESS")

步骤 7:-(将 scala 列表转换为字符串并将所需文件名添加到 hdfs 文件夹字符串,然后应用重命名)

val partFileSourcePath = new Path("/yourhdfsfolder/"+ partFileName.mkString(""))
    val desiredCsvTargetPath = new Path(/yourhdfsfolder/+ "op_"+ ".csv")
    hdfs.rename(partFileSourcePath , desiredCsvTargetPath)

【讨论】:

【参考方案10】:

在保存之前重新分区/合并到 1 个分区(您仍然会得到一个文件夹,但其中会有一个部分文件)

【讨论】:

【参考方案11】:

你可以使用rdd.coalesce(1, true).saveAsTextFile(path)

它将数据作为单个文件存储在 path/part-00000 中

【讨论】:

【参考方案12】:
spark.sql("select * from df").coalesce(1).write.option("mode","append").option("header","true").csv("/your/hdfs/path/")

spark.sql("select * from df") --> 这是数据框

coalesce(1)repartition(1) --> 这将使您的输出文件仅成为 1 个部分文件

write --> 写入数据

option("mode","append") --> 将数据附加到现有目录

option("header","true") --> 启用标头

csv("&lt;hdfs dir&gt;") --> 写入为 CSV 文件及其在 HDFS 中的输出位置

【讨论】:

你也可以使用df.select("*"),但是如果使用HDFS,几乎所有的Hadoop工具都接受一个文件目录,所以最好让Spark分割文件以供将来并行文件读取【参考方案13】:

通过使用 Listbuffer,我们可以将数据保存到单个文件中:

import java.io.FileWriter
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
    val text = spark.read.textFile("filepath")
    var data = ListBuffer[String]()
    for(line:String <- text.collect())
      data += line
    
    val writer = new FileWriter("filepath")
    data.foreach(line => writer.write(line.toString+"\n"))
    writer.close()

【讨论】:

【参考方案14】:
def export_csv(  
  fileName: String,
  filePath: String
  ) = 

  val filePathDestTemp = filePath + ".dir/"
  val merstageout_df = spark.sql(merstageout)

  merstageout_df
    .coalesce(1)
    .write
    .option("header", "true")
    .mode("overwrite")
    .csv(filePathDestTemp)
  
  val listFiles = dbutils.fs.ls(filePathDestTemp)

  for(subFiles <- listFiles)
      val subFiles_name: String = subFiles.name
      if (subFiles_name.slice(subFiles_name.length() - 4,subFiles_name.length()) == ".csv") 
        dbutils.fs.cp (filePathDestTemp + subFiles_name,  filePath + fileName+ ".csv")
        dbutils.fs.rm(filePathDestTemp, recurse=true)
       

【讨论】:

【参考方案15】:

还有另一种使用 Java 的方法

import java.io._

def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
  
     val p = new java.io.PrintWriter(f);  
     try  op(p)  
     finally  p.close() 
   

printToFile(new File("C:/TEMP/df.csv"))  p => df.collect().foreach(p.println)

【讨论】:

name 'true' 未定义 我不会使用.collect() 作为“解决方案”

以上是关于使用 spark-csv 编写单个 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章

spark-csv 包中的 inferSchema

如何在 Apache Spark 预构建版本中添加任何新库,如 spark-csv

Spark:spark-csv 花费的时间太长

如何为 spark-csv 提供 parserLib 和 inferSchema 选项

如何从 pyspark 中的本地 jar 导入包?

在 Spark 本地模式下包含包