使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费

Posted

技术标签:

【中文标题】使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费【英文标题】:JSON aggregation using s3-dist-cp for Spark application consumption 【发布时间】:2020-04-07 19:54:25 【问题描述】:

我在 AWS EMR 上运行的 spark 应用程序从存储在 S3 中的 JSON 数组加载数据。然后通过 Spark 引擎处理从中创建的 Dataframe。

我的源 JSON 数据采用多个 S3 对象的形式。我需要将它们压缩成一个 JSON 数组,以减少从我的 Spark 应用程序中读取的 S3 对象的数量。我尝试使用“s3-dist-cp --groupBy”,但结果是串联的 JSON 数据,它本身不是有效的 JSON 文件,所以我无法从中创建数据框。

这里有一个简化的例子来进一步说明。

来源数据:

S3 对象 Record1.json : "Name" : "John", "City" : "London"

S3 对象 Record2.json : "Name" : "Mary" , "City" : "Paris"

s3-dist-cp --src s3://source/ --dest s3://dest/ --groupBy='.*Record.*(\w+)'

聚合输出

“姓名”:“玛丽”,“城市”:“巴黎”“姓名”:“约翰”,“城市”:“伦敦”

我需要什么:

[“姓名”:“约翰”,“城市”:“伦敦”,“姓名”:“玛丽”,“城市”:“巴黎”]

应用代码供参考

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
val schema = new StructType()
                 .add("Name",StringType,true)
                 .add("City",StringType,true)

val df = spark.read.option("multiline","true").schema(schema).json("test.json")
df.show()

预期输出

+----+-----+

|名称|城市|

+----+-----+

|约翰|伦敦|

|玛丽|巴黎|

+----+-----+

s3-dist-cp 是否适合我的需要?关于将由 Spark 应用程序加载为 Dataframe 的 json 数据聚合的任何其他建议?

【问题讨论】:

你能解决吗?我有同样的问题,我需要在 EMR 中读取和转换很多小的 json 文件。我在 S3 中对数据进行了分区,但每个分区中有很多小文件。使用s3-dist-cp 将所有 S3 目录发送到 HDFS 没有完成。并直接从火花崩溃中读取。目前我正在迭代父分区并且它有点工作,但它确实效率低下。 【参考方案1】:

或者,您可以使用 regexp_replace 将单行字符串替换为 json 格式的多行字符串,然后再将其转换为数据集。

检查样本

val df = spark.read.text("test.json")\
    .withColumn("json", from_json(regexp_replace(col("value"), "\\", "\\n\"), schema))\
        .select("json.*")

df.show()

关于 regexp_replace: Pyspark replace strings in Spark dataframe column

【讨论】:

以上是关于使用 s3-dist-cp 的 JSON 聚合用于 Spark 应用程序消费的主要内容,如果未能解决你的问题,请参考以下文章

json 列上的聚合

postgreSql聚合函数row_to_json初使用

如何展平来自聚合输出的 JSON 结果

如何将对象响应对象的猫鼬聚合数组转换为json对象响应

Kendo UI - JSON 响应 - 使用带有服务器分组和服务器聚合的远程数据源的网格

jmeter断言,结果报告--6(响应断言和JSON断言,聚合报告,查看结果树)常见状态吗的排查方式