在 Pyspark SQL 中展开 JSON

Posted

技术标签:

【中文标题】在 Pyspark SQL 中展开 JSON【英文标题】:Explode JSON in PysparkSQL 【发布时间】:2021-12-29 08:01:23 【问题描述】:

我希望将嵌套的 json 分解为 CSV 文件。 希望将嵌套的 json 解析为行和列。

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Row
df=spark.read.option("multiline","true").json("sample1.json")
df.printSchema()

root
 |-- pid: struct (nullable = true)
 |    |-- Body: struct (nullable = true)
 |    |    |-- Vendor: struct (nullable = true)
 |    |    |    |-- RC: struct (nullable = true)
 |    |    |    |    |-- Updated_From_Date: string (nullable = true)
 |    |    |    |    |-- Updated_To_Date: string (nullable = true)
 |    |    |    |-- RD: struct (nullable = true)
 |    |    |    |    |-- Supplier: struct (nullable = true)
 |    |    |    |    |    |-- Supplier_Data: struct (nullable = true)
 |    |    |    |    |    |    |-- Days: long (nullable = true)
 |    |    |    |    |    |    |-- Reference: struct (nullable = true)
 |    |    |    |    |    |    |    |-- ID: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |    |    |-- Expected: long (nullable = true)
 |    |    |    |    |    |    |-- Payments: long (nullable = true)
 |    |    |    |    |    |    |-- Approval: struct (nullable = true)
 |    |    |    |    |    |    |    |-- ID: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |    |    |-- Areas_Changed: struct (nullable = true)
 |    |    |    |    |    |    |    |-- Alternate_Names: long (nullable = true)
 |    |    |    |    |    |    |    |-- Attachments: long (nullable = true)
 |    |    |    |    |    |    |    |-- Classifications: long (nullable = true)
 |    |    |    |    |    |    |    |-- Contact_Information: long (nullable = true)

我的代码:

df2=(df.select(F.explode("pid").alias('pid'))
         .select('pid.*')
         .select(F.explode('Body').alias('Body'))
         .select('Body.*')
         .select((F.explode('Vendor').alias('Vendor'))
         .select('Vendor.*')
         .select((F.explode('RC').alias('RC'))
         .select('RC.*'))))

错误: AnalysisException: 由于数据类型不匹配,无法解析“explode(pid)”:函数explode 的输入应该是数组或映射类型,而不是struct

如何解析为结构字段。 任何帮助将不胜感激:)

【问题讨论】:

【参考方案1】:

您只能在映射或数组类型上使用explode 函数。要访问 strcut 类型,只需使用 . 运算符。

假设你想获取 RC 和 RD 下的列,那么代码语法应该如下所示。

df.select("pid.Body.Vendor.RC.*", "pid.Body.Vendor.RD.*")

【讨论】:

这对我有用,但你能建议我如何为数组设置单独的列名。例如,RD.Supplier.Supplier_Data.Reference.ID 在这种情况下如果有 2 个元素,那么如何将它们分成 2 个不同的列?-谢谢 :) 假设数组列 ID 有 2 个元素,您想要创建 2 个不同的列,那么您可以使用数组索引来实现。 df.selectExpr("RD.Supplier.Supplier_Data.Reference.ID[0] as array1","RD.Supplier.Supplier_Data.Reference.ID[1] as array2")df.select(col("RD.Supplier.Supplier_Data.Reference.ID").getItem(0).as("array1"))

以上是关于在 Pyspark SQL 中展开 JSON的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 代码中读取嵌套的 Json 文件。 pyspark.sql.utils.AnalysisException:

PySpark:在 sql 中访问向量元素

在 pyspark 中加载 SQL 查询?

无法使用 PySpark 插入 SQL,但可以在 SQL 中使用

Pyspark:将 sql 查询转换为 pyspark?

在 pyspark sql 的连接中重复使用相同的数据框视图