将多个 JSON 文件合并为单个 JSON 和 parquet 文件

Posted

技术标签:

【中文标题】将多个 JSON 文件合并为单个 JSON 和 parquet 文件【英文标题】:Merge multiple JSON file to single JSON and parquet file 【发布时间】:2020-05-01 16:44:02 【问题描述】:

具有 100 个 JSON 的源 S3 位置

    所有 JSON 文件都需要合并为单个 JSON 文件。即非 part-0000... 文件 输出的单个 JSON 文件需要替换源 S3 位置上的所有这些文件 相同的 JSON 文件需要转换 Parquet 并保存到其他 S3 位置

除了下面还有什么最好的选择吗,

    将 JSON 文件加载到 Dataframe 中 保存到本地磁盘 将合并后的 JSON 文件上传到 S3 使用 AWS 开发工具包客户端 API 成功上传组合的 S3 文件后,清除其余的 S3 文件 这与 4 并行运行。通过数据帧 API 将 parquet 文件保存到 parquet S3 位置

我对上述设计有以下疑问

还有更稳健的方法吗? 我可以读取和写入相同的 S3 位置并跳过步骤号吗? 2.

【问题讨论】:

文件是否都在一个目录中,日分区,还是别的? 所有json文件都在一个目录中 检查答案希望有帮助。除非有审计要求,否则不再需要将中间内容保存到本地磁盘。 如果您使用 s3a 文件系统访问,则可以。 hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/… 确定@RamGhadiyaram 【参考方案1】:

是的,它可以跳过 #2。可以使用SaveMode.Overwrite 写入相同的位置来完成读取操作。

当您第一次读取 json 即 #1 作为数据帧时,如果您进行缓存,它将在内存中。之后,您可以进行清理并将所有 json 与 union 合并为一个,并在一个步骤中存储在 parquet 文件中。类似于此示例。案例 1:所有 json 都位于不同的文件夹中,您希望它们将最终数据帧作为镶木地板存储在 json 所在的同一位置...

val dfpath1 = spark.read.json("path1")
val dfpath2 =  spark.read.json("path2")
val dfpath3 =  spark.read.json("path3")

val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe

val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..

 
  finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")

案例 2:所有 json 都在同一个文件夹中,您希望它们将最终数据帧作为多个 parquet 存储在 json 所在的同一根位置...

在这种情况下,无需读取多个数据帧,您可以提供具有相同架构的 json 的根路径

val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")

// or you can give multiple paths spark.read.json("path1","path2","path3")
 // since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns  dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")

AFAIK,无论哪种情况,都不再需要 aws s3 sdk api。

更新:Reg. File Not Found Exception you are facing... see below code example of how to do it. I quoted the same example you showed me here

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("quick silver saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

结果:

+---+----+
|sex|date|
+---+----+
|1  |10  |
|2  |20  |
|3  |30  |
+---+----+

+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows

quick silver saved in same directory where he read it from final records he saved after clean up are  
+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
|3  |30  |Quick silver want to cleanup|
+---+----+----------------------------+


文件保存和回读清理并再次保存的屏幕截图:

注意: 您需要像上面建议的更新那样实施 case 1 case 2...

【讨论】:

我已经在 scala 中尝试了上面的代码,但它失败了,因为它在写入之前清除了目标目录。原因:java.io.FileNotFoundException:没有这样的文件或目录:s3a:// aws s3 ls yourdir name ... 显然找不到文件 请不要强迫我接受错误的答案。在建议之前,请做一些研究并自己尝试代码。你可以从这个 URL forums.databricks.com/questions/21830/… 开始学习 spark 第一件事是您没有指定任何示例或位置或数据框其完整的纯文本....Try to improve your self in asking question by putting good examples and code snippet. you asked in genaral case I answered in the same way, 由于星期天的活动,我无法回头......你能快速看看上面的内容。 I gave answer to databricks forums also【参考方案2】:
spark.read
                  .json(sourcePath)
                  .coalesce(1)
                  .write
                  .mode(SaveMode.Overwrite)
                  .json(tempTarget1)

                val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)

                val deleted = fs
                  .delete(new Path(sourcePath + File.separator), true)
                logger.info(s"S3 folder path deleted=$deleted sparkUuid=$sparkUuid path=$sourcePath")

                val renamed = fs
                  .rename(new Path(tempTarget1),new Path(sourcePath))

尝试并失败,

    数据帧缓存/持久化不起作用,因为每当我尝试编写 cachedDf.write 时,我会返回检查我在写入之前手动清理的 S3 文件。 直接将 Dataframe 写入同一个 S3 目录不起作用,因为 Dataframe 只会覆盖已分区的文件,即以“part-00...”开头的文件。

【讨论】:

以上是关于将多个 JSON 文件合并为单个 JSON 和 parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Gulp 将多个 (npm) package.json 文件合并为一个?

mySQL中的JSON操作

使用角度js将具有相同ID的重复对象从json数据合并为单个对象

Extjs 4,如何使用单个 Json 文件为多个动态网格发送多个元数据

使用 Jackson 将单个文件中的多个 JSON 对象读入 Java

在s3中使用pyspark合并多个小json文件[重复]