在pyspark中展平嵌套的json scala代码

Posted

技术标签:

【中文标题】在pyspark中展平嵌套的json scala代码【英文标题】:flatten nested json scala code in pyspark 【发布时间】:2021-03-02 11:02:17 【问题描述】:

尝试在 pyspark 中执行以下 scala 代码:

val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
                     .map(i => get_json_object($"Payment", s"$$[$i]"))

val newDF = dataframe
  .withColumn("Payment", explode(array(jsonElements: _*)))
  .where(!isnull($"Payment"))

例如,我正在尝试制作一个嵌套列,例如在下面的付款列中:

id name payment
1 James [ "@id": 1, "currency":"GBP","@id": 2, "currency": "USD" ]

变成:

id name payment
1 James "@id": 1, "currency":"GBP"
1 James "@id":2, "currency":"USD"

表架构:

root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

我尝试在 Pyspark 中编写此代码,但它只是将嵌套列(付款)变为 null:

lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[i]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()

非常感谢任何帮助。

【问题讨论】:

pyspark 的界面和scala 的很相似。如果您有任何具体问题,请尝试自己重写并来这里。 “为我写这个,但用 Python”不是 SO 的问题。 【参考方案1】:

这是另一种方法,它允许您根据正确的模式解析给定的 JSON 以生成支付数组。该解决方案基于from_json 函数,该函数允许您将字符串 JSON 解析为结构类型。

from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField
from pyspark.sql.functions import from_json, explode

data = [
  (1, 'James', '[ "@id": 1, "currency":"GBP","@id": 2, "currency": "USD" ]'), 
  (2, 'Tonny', '[ "@id": 3, "currency":"EUR","@id": 4, "currency": "USD" ]'), 
]
df = spark.createDataFrame(data, ['id', 'name', 'payment'])

str_schema = 'array<struct<`@id`:int,`currency`:string>>'

# st_schema = ArrayType(StructType([
#                 StructField('@id', IntegerType()),
#                 StructField('currency', StringType())]))

df = df.withColumn("payment", explode(from_json(df["payment"], str_schema)))

df.show()

# +---+-----+--------+
# | id| name| payment|
# +---+-----+--------+
# |  1|James|[1, GBP]|
# |  1|James|[2, USD]|
# |  2|Tonny|[3, EUR]|
# |  2|Tonny|[4, USD]|
# +---+-----+--------+

注意: 如您所见,您可以在模式的字符串表示或ArrayType 之间进行选择。两者都应该产生相同的结果。

【讨论】:

我意识到explode 从字面上做同样的事情:')。我记得我花了几个小时试图找出等效的 pyspark 代码并成功了,而 explode 函数做同样事情的事实却被大大地忽视了【参考方案2】:

我找到了解决方案:

首先将列转换为字符串类型如下:

bronzeDF = bronzeDF.withColumn("payment2", F.to_json("payment")).drop("payment")

然后可以在列上执行以下代码,将n个嵌套的json对象堆叠为具有相同外键值的单独行:

max_json_parts = 50
lst = [f for f in range(0, max_json_parts, 1)]
jsonElem = [F.get_json_object(F.col("payment2"), f"$[i]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem))).where(F.col("payment2").isNotNull())

然后转换回具有定义架构的结构,并将对象键分解为单独的列:

bronzeDF = bronzeDF.withColumn("temp", F.from_json("payment2", jsonSchemaPayment)).select("*", "temp.*").drop("payment2")

【讨论】:

以上是关于在pyspark中展平嵌套的json scala代码的主要内容,如果未能解决你的问题,请参考以下文章

使用 Azure Synapse pyspark 过滤器根据嵌套对象的数据类型展平嵌套的 json 对象

展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧

复杂和嵌套的 json 数据集如何与 pyspark 一起使用

Scala嵌套数组展平

在 Pyspark 中展平 Json

Json - 在 pyspark 中展平键和值