PySpark 将 JSON 字符串分解为多列
Posted
技术标签:
【中文标题】PySpark 将 JSON 字符串分解为多列【英文标题】:PySpark Explode JSON String into Multiple Columns 【发布时间】:2021-10-25 20:51:07 【问题描述】:我有一个包含字符串数据类型列的数据框。该字符串表示一个返回json的api请求。
df = spark.createDataFrame([
("[original=ranking=1.0, input=top3, response=[to=Sam, position=guard, to=John, position=center, to=Andrew, position=forward]]",1)],
"col1:string, col2:int")
df.show()
生成如下数据框:
+--------------------+----+
| col1|col2|
+--------------------+----+
|[original=ranki...| 1|
+--------------------+----+
我想要 col2 的输出,并从响应中增加两列。 Col3 将捕获玩家姓名,由 to= 指示,col 4 将其位置由 position= 指示。以及数据框现在将有三行,因为有三个玩家。示例:
+----+------+-------+
|col2| col3| col4|
+----+------+-------+
| 1| Sam| guard|
| 1| John| center|
| 1|Andrew|forward|
+----+------+-------+
我了解到我可以利用以下内容:
df.withColumn("col3",explode(from_json("col1")))
但是,鉴于我想要两列而不是一列并且需要架构,我不确定如何展开。
注意,我可以使用 json_dumps 修改响应以仅返回字符串的响应片段或...
[to=Sam, position=guard, to=John, position=center, to=Andrew, position=forward]]
【问题讨论】:
是否可以更改 API,使其返回 实际 json 字符串,例如"to":"Sam"
,而不是 to=Sam
?
是的,我在响应中调用了 .json()。相反,我可以做一个 json.loads() 并维护 json 字符串。然后看起来像“['original':'ranking':1.0, 'input':'top3', 'response':['to':'Sam', 'position':'guard', 'to':'John', 'position':'center', 'to':'Andrew', 'position':'forward']]",1)], "col1:string, col2 :int" .
【参考方案1】:
如果你像提到的那样简化输出,你可以定义一个简单的 JSON 模式并将 JSON 字符串转换为 StructType
并读取每个字段
输入
df = spark.createDataFrame([("['to': 'Sam', 'position': 'guard','to': 'John', 'position': 'center','to': 'Andrew', 'position': 'forward']",1)], "col1:string, col2:int")
# +-----------------------------------------------------------------------------------------------------------------+----+
# |col1 |col2|
# +-----------------------------------------------------------------------------------------------------------------+----+
# |['to': 'Sam', 'position': 'guard','to': 'John', 'position': 'center','to': 'Andrew', 'position': 'forward']|1 |
# +-----------------------------------------------------------------------------------------------------------------+----+
这就是转型
from pyspark.sql import functions as F
from pyspark.sql import types as T
schema = T.ArrayType(T.StructType([
T.StructField('to', T.StringType()),
T.StructField('position', T.StringType())
]))
(df
.withColumn('temp', F.explode(F.from_json('col1', schema=schema)))
.select(
F.col('col2'),
F.col('temp.to').alias('col3'),
F.col('temp.position').alias('col4'),
)
.show()
)
# Output
# +----+------+-------+
# |col2| col3| col4|
# +----+------+-------+
# | 1| Sam| guard|
# | 1| John| center|
# | 1|Andrew|forward|
# +----+------+-------+
【讨论】:
以上是关于PySpark 将 JSON 字符串分解为多列的主要内容,如果未能解决你的问题,请参考以下文章