在pyspark中展平嵌套的json scala代码
Posted
技术标签:
【中文标题】在pyspark中展平嵌套的json scala代码【英文标题】:flatten nested json scala code in pyspark 【发布时间】:2021-03-02 11:02:17 【问题描述】:尝试在 pyspark 中执行以下 scala 代码:
val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
.map(i => get_json_object($"Payment", s"$$[$i]"))
val newDF = dataframe
.withColumn("Payment", explode(array(jsonElements: _*)))
.where(!isnull($"Payment"))
例如,我正在尝试制作一个嵌套列,例如在下面的付款列中:
id | name | payment |
---|---|---|
1 | James | [ "@id": 1, "currency":"GBP","@id": 2, "currency": "USD" ] |
变成:
id | name | payment |
---|---|---|
1 | James | "@id": 1, "currency":"GBP" |
1 | James | "@id":2, "currency":"USD" |
表架构:
root
|-- id: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Payment: string (nullable = true)
我尝试在 Pyspark 中编写此代码,但它只是将嵌套列(付款)变为 null:
lst = [range(0,10)]
jsonElem = [F.get_json_object(F.col("payment"), f"$[i]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem)))
bronzeDF.show()
非常感谢任何帮助。
【问题讨论】:
pyspark 的界面和scala 的很相似。如果您有任何具体问题,请尝试自己重写并来这里。 “为我写这个,但用 Python”不是 SO 的问题。 【参考方案1】:这是另一种方法,它允许您根据正确的模式解析给定的 JSON 以生成支付数组。该解决方案基于from_json 函数,该函数允许您将字符串 JSON 解析为结构类型。
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructField
from pyspark.sql.functions import from_json, explode
data = [
(1, 'James', '[ "@id": 1, "currency":"GBP","@id": 2, "currency": "USD" ]'),
(2, 'Tonny', '[ "@id": 3, "currency":"EUR","@id": 4, "currency": "USD" ]'),
]
df = spark.createDataFrame(data, ['id', 'name', 'payment'])
str_schema = 'array<struct<`@id`:int,`currency`:string>>'
# st_schema = ArrayType(StructType([
# StructField('@id', IntegerType()),
# StructField('currency', StringType())]))
df = df.withColumn("payment", explode(from_json(df["payment"], str_schema)))
df.show()
# +---+-----+--------+
# | id| name| payment|
# +---+-----+--------+
# | 1|James|[1, GBP]|
# | 1|James|[2, USD]|
# | 2|Tonny|[3, EUR]|
# | 2|Tonny|[4, USD]|
# +---+-----+--------+
注意: 如您所见,您可以在模式的字符串表示或ArrayType
之间进行选择。两者都应该产生相同的结果。
【讨论】:
我意识到explode 从字面上做同样的事情:')。我记得我花了几个小时试图找出等效的 pyspark 代码并成功了,而 explode 函数做同样事情的事实却被大大地忽视了【参考方案2】:我找到了解决方案:
首先将列转换为字符串类型如下:
bronzeDF = bronzeDF.withColumn("payment2", F.to_json("payment")).drop("payment")
然后可以在列上执行以下代码,将n个嵌套的json对象堆叠为具有相同外键值的单独行:
max_json_parts = 50
lst = [f for f in range(0, max_json_parts, 1)]
jsonElem = [F.get_json_object(F.col("payment2"), f"$[i]") for i in lst]
bronzeDF = bronzeDF.withColumn("payment2", F.explode(F.array(*jsonElem))).where(F.col("payment2").isNotNull())
然后转换回具有定义架构的结构,并将对象键分解为单独的列:
bronzeDF = bronzeDF.withColumn("temp", F.from_json("payment2", jsonSchemaPayment)).select("*", "temp.*").drop("payment2")
【讨论】:
以上是关于在pyspark中展平嵌套的json scala代码的主要内容,如果未能解决你的问题,请参考以下文章
使用 Azure Synapse pyspark 过滤器根据嵌套对象的数据类型展平嵌套的 json 对象
展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧