如何将数据从 Spark SQL 导出到 CSV

Posted

技术标签:

【中文标题】如何将数据从 Spark SQL 导出到 CSV【英文标题】:How to export data from Spark SQL to CSV 【发布时间】:2015-11-03 10:53:14 【问题描述】:

此命令适用于 HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

但在使用 Spark SQL 时,我收到了 org.apache.spark.sql.hive.HiveQl 堆栈跟踪错误:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

请指导我在 Spark SQL 中编写导出到 CSV 功能。

【问题讨论】:

这个问题/答案不能解决 Spark 2.x 的问题...真正的问题是导出为标准 CSV 格式。请answer here. 【参考方案1】:

错误消息表明这不是查询语言中支持的功能。但是您可以像往常一样通过 RDD 接口 (df.rdd.saveAsTextFile) 以任何格式保存 DataFrame。或者您可以查看https://github.com/databricks/spark-csv。

【讨论】:

scala> df.write.format("com.databricks.spark.csv").save("/data/home.csv") :18: error: value write is not org.apache.spark.sql.SchemaRDD 的成员我是否需要再次使用 databricks 包构建当前 jar? DataFrame.write 已添加到 Apache Spark 1.4.0。【参考方案2】:

您可以使用以下语句将数据框的内容写入 CSV 格式 df.write.csv("/data/home/csv")

如果您需要将整个数据框写入单个 CSV 文件,请使用 df.coalesce(1).write.csv("/data/home/sample.csv")

对于spark 1.x,您可以使用spark-csv将结果写入CSV文件

低于 scala sn-p 会有所帮助

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

将内容写入单个文件

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")

【讨论】:

我尝试了你提到的合并。它在指定路径创建一个目录,其中包含一个“part”文件和一个名为“_SUCCESS”的文件。你知道一种实际上只获取一个文件的方法吗? 不,我认为没有办法。 不是本地文件而是hdfs文件 我在此代码中发现了一个错误,与此代码生成的单个 csv 相比,我的带有 csv 分区的原始目录多出 1 列。我知道代码适用于微不足道的情况,但我最后两列的格式为concat('"', concat_ws(",", collect_list(some_column)), '"'),在插入覆盖时效果很好,但当我选择所有列并写入这种格式时,即使标题是正确的,但它错误地识别了倒数第二列值填充两者并忽略其余部分 这是我的 csv 分区在 "USR",0,0,""css","shell","html","python","javascript"","381534,3960,1683,229869,1569090" 之前的样子,这是它们现在的样子 "\"USR\"",0,0,"\"\"css\"","\"shell\""【参考方案3】:

最简单的方法是映射DataFrame的RDD并使用mkString:

  df.rdd.map(x=>x.mkString(","))

从 Spark 1.5 开始(甚至在此之前) df.map(r=>r.mkString(",")) 也会这样做 如果你想要 CSV 转义,你可以使用 apache commons lang。例如这是我们正在使用的代码

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = 

    def trimColumnLength(c: String) = 
      val col = maxColumnLength match 
        case None => c
        case Some(len: Int) => c.take(len)
      
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    
    def rowToString(r: Row) = 
      val st = r.mkString("~-~").replaceAll("[\\pC|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    

    def addHeader(r: RDD[String]) = 
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  

【讨论】:

虽然这是最简单的答案(也是一个很好的答案),但如果您的文本中有双引号,则必须考虑它们。 为表scala创建RDD后简单地得到错误> df.rdd.map(x=>x.mkString(",")); :18: error: value rdd is not a member of org.apache.spark.sql.SchemaRDD df.rdd.map(x=>x.mkString(","));【参考方案4】:

上面关于 spark-csv 的答案是正确的,但存在一个问题——该库根据数据框分区创建了多个文件。这不是我们通常需要的。因此,您可以将所有分区合并为一个:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

并将库的输出(名称“part-00000”)重命名为所需的文件名。

这篇博文提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

【讨论】:

应该是 df.repartition.write 而不是 df.write.repartition ? @Cedric 你是对的,谢谢!先重新分区!已编辑。 如果希望继续写入现有文件,也可以添加模型。 resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").save("s3://...") coalesce(1) 要求数据集适合单台机器的堆,并且在处理大型数据集时很可能会导致问题 @DmitryPetrov 我们是否需要在包含合并选项时提及 write.format("com...") 选项?【参考方案5】:

由于 Spark 2.X spark-csv 被集成为 native datasource。因此,必要的语句简化为 (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

或 UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

注意:正如 cmets 所说,它使用该名称创建目录,其中包含分区,而不是 standard CSV file。但是,这很可能是您想要的,因为否则您的驱动程序会崩溃(内存不足),或者您可能正在使用非分布式环境。

【讨论】:

大家好,有没有办法替换文件,因为它尝试重写文件时失败。 当然! .mode("overwrite").csv("/var/out.csv") 在 Spark 2.x 中,它使用该名称创建目录。有什么帮助吗? 我猜你的分区在那个目录里面。 但是它不是标准的 CSV 文件,它正在生成一个包含奇怪文件的文件夹 (!)。见***.com/q/58142220/287948【参考方案6】:

借助 spark-csv,我们可以写入 CSV 文件。

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`

【讨论】:

不,它不是真正的 CSV 文件,结果 output.csv 是一个文件夹。【参考方案7】:

在此处在 DATAFRAME 中输入代码:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")

【讨论】:

以上是关于如何将数据从 Spark SQL 导出到 CSV的主要内容,如果未能解决你的问题,请参考以下文章

PostgreSQL:将结果数据从 SQL 查询导出到 Excel/CSV

Spark-SQL:如何将 TSV 或 CSV 文件读入数据框并应用自定义模式?

php 将数据从SQL导出到CSV

将数据从 SQL Server Express 导出到 CSV(需要引用和转义)

将数据从 oracle sql plus 导出到 csv 时,Hypen 转换为问号

如何将 PostgreSQL 查询输出导出到 csv 文件