如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

Posted

技术标签:

【中文标题】如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件【英文标题】:How to write one Json file for each row from the dataframe in Scala/Spark and rename the files 【发布时间】:2019-02-07 21:24:15 【问题描述】:

需要为数据框中的每一行创建一个 json 文件。我正在使用 PartitionBy 为每个文件创建子文件夹。有没有办法避免创建子文件夹并使用唯一键重命名 json 文件? 或任何其他选择?它是一个巨大的数据框,包含数千个 (~300K) 的唯一值,因此 Repartition 占用了大量资源并花费时间。谢谢。

df.select(Seq(col("UniqueField").as("UniqueField_Copy")) ++ 
df.columns.map(col): _*)       
.write.partitionBy("UniqueField")
.mode("overwrite").format("json").save("c:\temp\json\")

【问题讨论】:

【参考方案1】:

将所有输出放在一个目录中

您的示例代码在 DataFrameWriter 对象上调用 partitionBy。文档告诉我们这个函数:

按文件系统上的给定列对输出进行分区。如果指定,则输出布局在文件系统上,类似于 Hive 的分区方案。例如,当我们按年和月对数据集进行分区时,目录布局如下所示:

年=2016/月=01/

年=2016/月=02/

这就是您获得子目录的原因。只需删除对partitionBy 的调用即可将所有输出放在一个目录中。

每个文件获取一行

Spark SQL

您有一个正确的想法,即按UniqueField 对数据进行分区,因为 Spark 每个分区写入一个文件。而不是使用DataFrameWriter的分区,你可以使用

df.repartitionByRange(numberOfJson, $"UniqueField")

获取所需数量的分区,每个分区一个 JSON。请注意,这需要您提前知道最终将获得的 JSON 数量。你可以计算它

val numberOfJson = df.select(count($"UniqueField")).first.getAs[Long](0)

但是,这会为您的查询添加一个额外的操作,这将导致您的 整个 数据集再次被计算。听起来您的数据集太大而无法放入内存,因此您需要仔细考虑使用df.cache(或df.checkpoint)进行缓存(或检查点)是否真的可以节省计算时间。 (对于不需要大量计算来创建的大型数据集,重新计算实际上可以更快)

RDD

使用 Spark SQL API 的替代方法是下拉到较低级别的 RDD。在this question 的回答中彻底讨论了 RDD 的按键分区(在 pyspark 中)。在 scala 中,您必须指定自定义 Partitioner,如 this question 中所述。

重命名 Spark 的输出文件

This 是一个相当普遍的问题,而 AFAIK 的共识是这是不可能的。

希望对您有所帮助,欢迎使用 Stack Overflow!

【讨论】:

以上是关于如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件的主要内容,如果未能解决你的问题,请参考以下文章

如何访问存储在scala spark中的数据框中的映射值和键

Scala(Spark)连接数据框中的列[重复]

如何在数据框中的每一列上运行 udf?

如何遍历熊猫数据框中的每一列和每个单元格

如何在 Scala/Spark 的数据框中扩展数组 [重复]

在 scala spark 数据框中提取时间间隔