如何使用 Apache Spark 和 Scala 创建嵌套 json
Posted
技术标签:
【中文标题】如何使用 Apache Spark 和 Scala 创建嵌套 json【英文标题】:How to create nested json using Apache Spark with Scala 【发布时间】:2019-09-25 10:33:32 【问题描述】:我正在尝试从我的 spark 数据框创建一个嵌套 JSON,该数据框具有以下结构的数据。
Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4
使用带有 Scala 的 Apache-Spark 以以下格式输出所需的 json。
[
"vendor_name": "Vendor 1",
"count": 10,
"categories": [
"name": "Category 1",
"count": 4,
"subCategories": [
"name": "Sub Category 1",
"count": 1
,
"name": "Sub Category 2",
"count": 1
,
"name": "Sub Category 3",
"count": 1
,
"name": "Sub Category 4",
"count": 1
]
]
【问题讨论】:
能否添加数据框创建、架构等的代码? 【参考方案1】://read file into DataFrame
scala> val df = spark.read.format("csv").option("header", "true").load(<input CSV path>)
df: org.apache.spark.sql.DataFrame = [Vendor_Name: string, count: string ... 4 more fields]
scala> df.show(false)
+-----------+-----+----------+--------------+--------------+-----------------+
|Vendor_Name|count|Categories|Category_Count|Subcategory |Subcategory_Count|
+-----------+-----+----------+--------------+--------------+-----------------+
|Vendor1 |10 |Category 1|4 |Sub Category 1|1 |
|Vendor1 |10 |Category 1|4 |Sub Category 2|2 |
|Vendor1 |10 |Category 1|4 |Sub Category 3|3 |
|Vendor1 |10 |Category 1|4 |Sub Category 4|4 |
+-----------+-----+----------+--------------+--------------+-----------------+
//convert into desire Json format
scala> val df1 = df.groupBy("Vendor_Name","count","Categories","Category_Count").agg(collect_list(struct(col("Subcategory").alias("name"),col("Subcategory_Count").alias("count"))).alias("subCategories")).groupBy("Vendor_Name","count").agg(collect_list(struct(col("Categories").alias("name"),col("Category_Count").alias("count"),col("subCategories"))).alias("categories"))
df1: org.apache.spark.sql.DataFrame = [Vendor_Name: string, count: string ... 1 more field]
scala> df1.printSchema
root
|-- Vendor_Name: string (nullable = true)
|-- count: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- count: string (nullable = true)
| | |-- subCategories: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- name: string (nullable = true)
| | | | |-- count: string (nullable = true)
//Write df in json format
scala> df1.write.format("json").mode("append").save(<output Path>)
【讨论】:
有没有文档链接或者教程链接,可以更详细的了解collect_list、struct()、col()、agg()。以上是关于如何使用 Apache Spark 和 Scala 创建嵌套 json的主要内容,如果未能解决你的问题,请参考以下文章
如何使用scala将特定函数转换为apache spark中的udf函数? [复制]
日志分析实战之清洗日志小实例1:使用spark&Scala分析Apache日志
如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?