使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费
Posted
技术标签:
【中文标题】使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费【英文标题】:JSON aggregation using s3-dist-cp for Spark application consumption 【发布时间】:2020-04-07 19:54:25 【问题描述】:我在 AWS EMR 上运行的 spark 应用程序从存储在 S3 中的 JSON 数组加载数据。然后通过 Spark 引擎处理从中创建的 Dataframe。
我的源 JSON 数据采用多个 S3 对象的形式。我需要将它们压缩成一个 JSON 数组,以减少从我的 Spark 应用程序中读取的 S3 对象的数量。我尝试使用“s3-dist-cp --groupBy”,但结果是串联的 JSON 数据,它本身不是有效的 JSON 文件,所以我无法从中创建数据框。
这里有一个简化的例子来进一步说明。
来源数据:
S3 对象 Record1.json : "Name" : "John", "City" : "London"
S3 对象 Record2.json : "Name" : "Mary" , "City" : "Paris"
s3-dist-cp --src s3://source/ --dest s3://dest/ --groupBy='.*Record.*(\w+)'
聚合输出
“姓名”:“玛丽”,“城市”:“巴黎”“姓名”:“约翰”,“城市”:“伦敦”
我需要什么:
[“姓名”:“约翰”,“城市”:“伦敦”,“姓名”:“玛丽”,“城市”:“巴黎”]
应用代码供参考
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
val schema = new StructType()
.add("Name",StringType,true)
.add("City",StringType,true)
val df = spark.read.option("multiline","true").schema(schema).json("test.json")
df.show()
预期输出
+----+-----+
|名称|城市|
+----+-----+
|约翰|伦敦|
|玛丽|巴黎|
+----+-----+
s3-dist-cp 是否适合我的需要?关于将由 Spark 应用程序加载为 Dataframe 的 json 数据聚合的任何其他建议?
【问题讨论】:
你能解决吗?我有同样的问题,我需要在 EMR 中读取和转换很多小的 json 文件。我在 S3 中对数据进行了分区,但每个分区中有很多小文件。使用s3-dist-cp
将所有 S3 目录发送到 HDFS 没有完成。并直接从火花崩溃中读取。目前我正在迭代父分区并且它有点工作,但它确实效率低下。
【参考方案1】:
或者,您可以使用 regexp_replace 将单行字符串替换为 json 格式的多行字符串,然后再将其转换为数据集。
检查样本:
val df = spark.read.text("test.json")\
.withColumn("json", from_json(regexp_replace(col("value"), "\\", "\\n\"), schema))\
.select("json.*")
df.show()
关于 regexp_replace: Pyspark replace strings in Spark dataframe column
【讨论】:
以上是关于使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费的主要内容,如果未能解决你的问题,请参考以下文章