如何将数据从 Spark SQL 导出到 CSV
Posted
技术标签:
【中文标题】如何将数据从 Spark SQL 导出到 CSV【英文标题】:How to export data from Spark SQL to CSV 【发布时间】:2015-11-03 10:53:14 【问题描述】:此命令适用于 HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
但在使用 Spark SQL 时,我收到了 org.apache.spark.sql.hive.HiveQl
堆栈跟踪错误:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
请指导我在 Spark SQL 中编写导出到 CSV 功能。
【问题讨论】:
这个问题/答案不能解决 Spark 2.x 的问题...真正的问题是导出为标准 CSV 格式。请answer here. 【参考方案1】:错误消息表明这不是查询语言中支持的功能。但是您可以像往常一样通过 RDD 接口 (df.rdd.saveAsTextFile
) 以任何格式保存 DataFrame。或者您可以查看https://github.com/databricks/spark-csv。
【讨论】:
scala> df.write.format("com.databricks.spark.csv").save("/data/home.csv")DataFrame.write
已添加到 Apache Spark 1.4.0。【参考方案2】:
您可以使用以下语句将数据框的内容写入 CSV 格式
df.write.csv("/data/home/csv")
如果您需要将整个数据框写入单个 CSV 文件,请使用
df.coalesce(1).write.csv("/data/home/sample.csv")
对于spark 1.x,您可以使用spark-csv将结果写入CSV文件
低于 scala sn-p 会有所帮助
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")
将内容写入单个文件
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
【讨论】:
我尝试了你提到的合并。它在指定路径创建一个目录,其中包含一个“part”文件和一个名为“_SUCCESS”的文件。你知道一种实际上只获取一个文件的方法吗? 不,我认为没有办法。 不是本地文件而是hdfs文件 我在此代码中发现了一个错误,与此代码生成的单个 csv 相比,我的带有 csv 分区的原始目录多出 1 列。我知道代码适用于微不足道的情况,但我最后两列的格式为concat('"', concat_ws(",", collect_list(some_column)), '"')
,在插入覆盖时效果很好,但当我选择所有列并写入这种格式时,即使标题是正确的,但它错误地识别了倒数第二列值填充两者并忽略其余部分
这是我的 csv 分区在 "USR",0,0,""css","shell","html","python","javascript"","381534,3960,1683,229869,1569090"
之前的样子,这是它们现在的样子 "\"USR\"",0,0,"\"\"css\"","\"shell\""
【参考方案3】:
最简单的方法是映射DataFrame的RDD并使用mkString:
df.rdd.map(x=>x.mkString(","))
从 Spark 1.5 开始(甚至在此之前)
df.map(r=>r.mkString(","))
也会这样做
如果你想要 CSV 转义,你可以使用 apache commons lang。例如这是我们正在使用的代码
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) =
def trimColumnLength(c: String) =
val col = maxColumnLength match
case None => c
case Some(len: Int) => c.take(len)
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
def rowToString(r: Row) =
val st = r.mkString("~-~").replaceAll("[\\pC|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
def addHeader(r: RDD[String]) =
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
【讨论】:
虽然这是最简单的答案(也是一个很好的答案),但如果您的文本中有双引号,则必须考虑它们。 为表scala创建RDD后简单地得到错误> df.rdd.map(x=>x.mkString(","));上面关于 spark-csv 的答案是正确的,但存在一个问题——该库根据数据框分区创建了多个文件。这不是我们通常需要的。因此,您可以将所有分区合并为一个:
df.coalesce(1).
write.
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
并将库的输出(名称“part-00000”)重命名为所需的文件名。
这篇博文提供了更多详细信息:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
【讨论】:
应该是 df.repartition.write 而不是 df.write.repartition ? @Cedric 你是对的,谢谢!先重新分区!已编辑。 如果希望继续写入现有文件,也可以添加模型。resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").save("s3://...")
coalesce(1)
要求数据集适合单台机器的堆,并且在处理大型数据集时很可能会导致问题
@DmitryPetrov 我们是否需要在包含合并选项时提及 write.format("com...") 选项?【参考方案5】:
由于 Spark 2.X
spark-csv
被集成为 native datasource。因此,必要的语句简化为 (windows)
df.write
.option("header", "true")
.csv("file:///C:/out.csv")
或 UNIX
df.write
.option("header", "true")
.csv("/var/out.csv")
注意:正如 cmets 所说,它使用该名称创建目录,其中包含分区,而不是 standard CSV file。但是,这很可能是您想要的,因为否则您的驱动程序会崩溃(内存不足),或者您可能正在使用非分布式环境。
【讨论】:
大家好,有没有办法替换文件,因为它尝试重写文件时失败。 当然!.mode("overwrite").csv("/var/out.csv")
在 Spark 2.x 中,它使用该名称创建目录。有什么帮助吗?
我猜你的分区在那个目录里面。
但是它不是标准的 CSV 文件,它正在生成一个包含奇怪文件的文件夹 (!)。见***.com/q/58142220/287948【参考方案6】:
借助 spark-csv,我们可以写入 CSV 文件。
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
【讨论】:
不,它不是真正的 CSV 文件,结果 output.csv 是一个文件夹。【参考方案7】:在此处在 DATAFRAME 中输入代码:
val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")
【讨论】:
以上是关于如何将数据从 Spark SQL 导出到 CSV的主要内容,如果未能解决你的问题,请参考以下文章
PostgreSQL:将结果数据从 SQL 查询导出到 Excel/CSV
Spark-SQL:如何将 TSV 或 CSV 文件读入数据框并应用自定义模式?
将数据从 SQL Server Express 导出到 CSV(需要引用和转义)