将多个 JSON 文件合并为单个 JSON 和 parquet 文件
Posted
技术标签:
【中文标题】将多个 JSON 文件合并为单个 JSON 和 parquet 文件【英文标题】:Merge multiple JSON file to single JSON and parquet file 【发布时间】:2020-05-01 16:44:02 【问题描述】:具有 100 个 JSON 的源 S3 位置
-
所有 JSON 文件都需要合并为单个 JSON 文件。即非
part-0000...
文件
输出的单个 JSON 文件需要替换源 S3 位置上的所有这些文件
相同的 JSON 文件需要转换 Parquet 并保存到其他 S3 位置
除了下面还有什么最好的选择吗,
-
将 JSON 文件加载到 Dataframe 中
保存到本地磁盘
将合并后的 JSON 文件上传到 S3
使用 AWS 开发工具包客户端 API 成功上传组合的 S3 文件后,清除其余的 S3 文件
这与 4 并行运行。通过数据帧 API 将 parquet 文件保存到 parquet S3 位置
我对上述设计有以下疑问
还有更稳健的方法吗? 我可以读取和写入相同的 S3 位置并跳过步骤号吗? 2.【问题讨论】:
文件是否都在一个目录中,日分区,还是别的? 所有json文件都在一个目录中 检查答案希望有帮助。除非有审计要求,否则不再需要将中间内容保存到本地磁盘。 如果您使用 s3a 文件系统访问,则可以。 hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/… 确定@RamGhadiyaram 【参考方案1】:是的,它可以跳过 #2。可以使用SaveMode.Overwrite
写入相同的位置来完成读取操作。
当您第一次读取 json 即 #1 作为数据帧时,如果您进行缓存,它将在内存中。之后,您可以进行清理并将所有 json 与 union 合并为一个,并在一个步骤中存储在 parquet 文件中。类似于此示例。案例 1:所有 json 都位于不同的文件夹中,您希望它们将最终数据帧作为镶木地板存储在 json 所在的同一位置...
val dfpath1 = spark.read.json("path1")
val dfpath2 = spark.read.json("path2")
val dfpath3 = spark.read.json("path3")
val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe
val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")
案例 2:所有 json 都在同一个文件夹中,您希望它们将最终数据帧作为多个 parquet 存储在 json 所在的同一根位置...
在这种情况下,无需读取多个数据帧,您可以提供具有相同架构的 json 的根路径
val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")
// or you can give multiple paths spark.read.json("path1","path2","path3")
// since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")
AFAIK,无论哪种情况,都不再需要 aws s3 sdk api。
更新:Reg. File Not Found Exception you are facing... see below code example of how to do it. I quoted the same example you showed me here
import org.apache.spark.sql.functions._
val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")
df.show(false)
df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
val df1 = spark.read.format("parquet").load(".../temp") // read back again
val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.
//BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.
df2.cache // cache to avoid FileNotFoundException
df2.show(2, false) // light action to avoid FileNotFoundException
// or println(df2.count) // action
df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
println("quick silver saved in same directory where he read it from final records he saved after clean up are ")
df2.show(false)
结果:
+---+----+
|sex|date|
+---+----+
|1 |10 |
|2 |20 |
|3 |30 |
+---+----+
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows
quick silver saved in same directory where he read it from final records he saved after clean up are
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
|3 |30 |Quick silver want to cleanup|
+---+----+----------------------------+
文件保存和回读清理并再次保存的屏幕截图:
注意: 您需要像上面建议的更新那样实施 case 1 或 case 2...
【讨论】:
我已经在 scala 中尝试了上面的代码,但它失败了,因为它在写入之前清除了目标目录。原因:java.io.FileNotFoundException:没有这样的文件或目录:s3a://spark.read
.json(sourcePath)
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.json(tempTarget1)
val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)
val deleted = fs
.delete(new Path(sourcePath + File.separator), true)
logger.info(s"S3 folder path deleted=$deleted sparkUuid=$sparkUuid path=$sourcePath")
val renamed = fs
.rename(new Path(tempTarget1),new Path(sourcePath))
尝试并失败,
-
数据帧缓存/持久化不起作用,因为每当我尝试编写
cachedDf.write
时,我会返回检查我在写入之前手动清理的 S3 文件。
直接将 Dataframe 写入同一个 S3 目录不起作用,因为 Dataframe 只会覆盖已分区的文件,即以“part-00...”开头的文件。
【讨论】:
以上是关于将多个 JSON 文件合并为单个 JSON 和 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Gulp 将多个 (npm) package.json 文件合并为一个?
使用角度js将具有相同ID的重复对象从json数据合并为单个对象
Extjs 4,如何使用单个 Json 文件为多个动态网格发送多个元数据