将 Spark DataFrame 数据分成单独的文件

Posted

技术标签:

【中文标题】将 Spark DataFrame 数据分成单独的文件【英文标题】:Divide Spark DataFrame data into separate files 【发布时间】:2016-11-11 18:18:01 【问题描述】:

我有以下来自 s3 文件的 DataFrame 输入,需要将数据转换为以下所需的输出。我正在使用带有 Scala 的 Spark 版本 1.5.1,但可以使用 Python 更改为 Spark。欢迎提出任何建议。

数据帧输入:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

期望的输出:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$ cat bob/mouse/file.csv
bbbbb
ccccc

terminal$ cat bob/dog/file.csv
ddddd

这是我尝试过的现有 Spark Scala 代码:

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)

电流输出:

[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]

我现有代码的一些问题是 groupBy 返回一个 GroupedData 对象,我可能不想对该数据执行 count/sum/agg 函数。我正在寻找一种更好的技术来分组和输出数据。数据集非常大。

【问题讨论】:

【参考方案1】:

这可以使用DataFrameWriterpartitionBy 选项来实现。一般语法如下:

df.write.partitionBy("name", "animal").format(...).save(...)

不幸的是,在 Spark 1.5 中唯一支持分区的纯文本格式是 JSON。

如果您可以将 Spark 安装更新为:

1.6 - 您可以使用 partitionBytext 格式。如果您需要组的单个输出文件 (repartition),则还需要 1.6。 2.0 - 您可以使用partitionBycsv 格式。

我相信在 1.5 中您最好的选择是将文件编写为 JSON 并转换单个输出文件。

如果不同 name', 'animals 的数量很少,您可以尝试为每个组执行单独的写入:

val dist = df.select("name", "animal").rdd.collect.map 
  case Row(name: String, animal: String) => (name, animal)


for 
  (name, animal) <- dist
 df.where($"name" === name && $"animal" === animal)
    .select($"data").write.format("csv").save(s"/prefix/$name/$animal")

但这不会随着组合数量的增加而扩展。

【讨论】:

以上是关于将 Spark DataFrame 数据分成单独的文件的主要内容,如果未能解决你的问题,请参考以下文章

Pandas DataFrame [cell=(label,value)],分成 2 个单独的数据框

在 Apache Spark 中拆分 DataFrame

当数组很大时,在Scala中的Spark Dataframe中从数组列创建单独的列[重复]

如何保证 Spark Dataframe 中的重新分区

从 Spark DataFrame 中的单列派生多列

Spark DataFrame xml更改列名