PySpark 将 JSON 字符串分解为多列

Posted

技术标签:

【中文标题】PySpark 将 JSON 字符串分解为多列【英文标题】:PySpark Explode JSON String into Multiple Columns 【发布时间】:2021-10-25 20:51:07 【问题描述】:

我有一个包含字符串数据类型列的数据框。该字符串表示一个返回json的api请求。

df = spark.createDataFrame([
           ("[original=ranking=1.0, input=top3, response=[to=Sam, position=guard, to=John, position=center, to=Andrew, position=forward]]",1)], 
           "col1:string, col2:int")
df.show()

生成如下数据框:

+--------------------+----+
|                col1|col2|
+--------------------+----+
|[original=ranki...|   1|
+--------------------+----+

我想要 col2 的输出,并从响应中增加两列。 Col3 将捕获玩家姓名,由 to= 指示,col 4 将其位置由 position= 指示。以及数据框现在将有三行,因为有三个玩家。示例:

+----+------+-------+
|col2|  col3|   col4|
+----+------+-------+
|   1|   Sam|  guard|
|   1|  John| center|
|   1|Andrew|forward|
+----+------+-------+

我了解到我可以利用以下内容:

df.withColumn("col3",explode(from_json("col1")))

但是,鉴于我想要两列而不是一列并且需要架构,我不确定如何展开。

注意,我可以使用 json_dumps 修改响应以仅返回字符串的响应片段或...

[to=Sam, position=guard, to=John, position=center, to=Andrew, position=forward]]

【问题讨论】:

是否可以更改 API,使其返回 实际 json 字符串,例如 "to":"Sam",而不是 to=Sam 是的,我在响应中调用了 .json()。相反,我可以做一个 json.loads() 并维护 json 字符串。然后看起来像“['original':'ranking':1.0, 'input':'top3', 'response':['to':'Sam', 'position':'guard', 'to':'John', 'position':'center', 'to':'Andrew', 'position':'forward']]",1)], "col1:string, col2 :int" . 【参考方案1】:

如果你像提到的那样简化输出,你可以定义一个简单的 JSON 模式并将 JSON 字符串转换为 StructType 并读取每个字段

输入

df = spark.createDataFrame([("['to': 'Sam', 'position': 'guard','to': 'John', 'position': 'center','to': 'Andrew', 'position': 'forward']",1)], "col1:string, col2:int")

# +-----------------------------------------------------------------------------------------------------------------+----+
# |col1                                                                                                             |col2|
# +-----------------------------------------------------------------------------------------------------------------+----+
# |['to': 'Sam', 'position': 'guard','to': 'John', 'position': 'center','to': 'Andrew', 'position': 'forward']|1   |
# +-----------------------------------------------------------------------------------------------------------------+----+

这就是转型

from pyspark.sql import functions as F
from pyspark.sql import types as T

schema = T.ArrayType(T.StructType([
    T.StructField('to', T.StringType()),
    T.StructField('position', T.StringType())
]))

(df
    .withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
    .select(
        F.col('col2'),
        F.col('temp.to').alias('col3'),
        F.col('temp.position').alias('col4'),
    )
    .show()
)

# Output
# +----+------+-------+
# |col2|  col3|   col4|
# +----+------+-------+
# |   1|   Sam|  guard|
# |   1|  John| center|
# |   1|Andrew|forward|
# +----+------+-------+

【讨论】:

以上是关于PySpark 将 JSON 字符串分解为多列的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何将列表分解为具有顺序命名的多列?

PySpark 根据名称将列表分解为多列

Pyspark将JSON对象列拆分为多列

Pyspark 数据框将多列转换为浮点数

Bigquery:将 SPLIT() 输出行分解为多列

如何将 map_keys() 中的值拆分为 PySpark 中的多列