如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件
Posted
技术标签:
【中文标题】如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件【英文标题】:How to write one Json file for each row from the dataframe in Scala/Spark and rename the files 【发布时间】:2019-02-07 21:24:15 【问题描述】:需要为数据框中的每一行创建一个 json 文件。我正在使用 PartitionBy 为每个文件创建子文件夹。有没有办法避免创建子文件夹并使用唯一键重命名 json 文件? 或任何其他选择?它是一个巨大的数据框,包含数千个 (~300K) 的唯一值,因此 Repartition 占用了大量资源并花费时间。谢谢。
df.select(Seq(col("UniqueField").as("UniqueField_Copy")) ++
df.columns.map(col): _*)
.write.partitionBy("UniqueField")
.mode("overwrite").format("json").save("c:\temp\json\")
【问题讨论】:
【参考方案1】:将所有输出放在一个目录中
您的示例代码在 DataFrameWriter
对象上调用 partitionBy
。文档告诉我们这个函数:
按文件系统上的给定列对输出进行分区。如果指定,则输出布局在文件系统上,类似于 Hive 的分区方案。例如,当我们按年和月对数据集进行分区时,目录布局如下所示:
年=2016/月=01/
年=2016/月=02/
这就是您获得子目录的原因。只需删除对partitionBy
的调用即可将所有输出放在一个目录中。
每个文件获取一行
Spark SQL
您有一个正确的想法,即按UniqueField
对数据进行分区,因为 Spark 每个分区写入一个文件。而不是使用DataFrameWriter
的分区,你可以使用
df.repartitionByRange(numberOfJson, $"UniqueField")
获取所需数量的分区,每个分区一个 JSON。请注意,这需要您提前知道最终将获得的 JSON 数量。你可以计算它
val numberOfJson = df.select(count($"UniqueField")).first.getAs[Long](0)
但是,这会为您的查询添加一个额外的操作,这将导致您的 整个 数据集再次被计算。听起来您的数据集太大而无法放入内存,因此您需要仔细考虑使用df.cache
(或df.checkpoint
)进行缓存(或检查点)是否真的可以节省计算时间。 (对于不需要大量计算来创建的大型数据集,重新计算实际上可以更快)
RDD
使用 Spark SQL API 的替代方法是下拉到较低级别的 RDD
。在this question 的回答中彻底讨论了 RDD 的按键分区(在 pyspark 中)。在 scala 中,您必须指定自定义 Partitioner
,如 this question 中所述。
重命名 Spark 的输出文件
This 是一个相当普遍的问题,而 AFAIK 的共识是这是不可能的。
希望对您有所帮助,欢迎使用 Stack Overflow!
【讨论】:
以上是关于如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件的主要内容,如果未能解决你的问题,请参考以下文章
如何访问存储在scala spark中的数据框中的映射值和键