在 Pyspark 中展平 Json
Posted
技术标签:
【中文标题】在 Pyspark 中展平 Json【英文标题】:Flatten Json in Pyspark 【发布时间】:2021-04-08 23:24:31 【问题描述】:my_data=[
'stationCode': 'NB001',
'summaries': ['period': 'year': 2017, 'rainfall': 449,
'period': 'year': 2018, 'rainfall': 352.4,
'period': 'year': 2019, 'rainfall': 253.2,
'period': 'year': 2020, 'rainfall': 283,
'period': 'year': 2021, 'rainfall': 104.2],
'stationCode': 'NA003',
'summaries': ['period': 'year': 2019, 'rainfall': 58.2,
'period': 'year': 2020, 'rainfall': 628.2,
'period': 'year': 2021, 'rainfall': 120]]
在 Pandas 中我可以:
import pandas as pd
from pandas import json_normalize
pd.concat([json_normalize(entry, 'summaries', 'stationCode')
for entry in my_data])
这会给我下表:
rainfall period.year stationCode
0 449.0 2017 NB001
1 352.4 2018 NB001
2 253.2 2019 NB001
3 283.0 2020 NB001
4 104.2 2021 NB001
0 58.2 2019 NA003
1 628.2 2020 NA003
2 120.0 2021 NA003
这可以在pyspark的一行代码中实现吗?
我已经尝试了下面的代码,它给了我相同的结果。不过太长了,有没有办法缩短呢?;
df=sc.parallelize(my_data)
df1=spark.read.json(df)
df1.select("stationCode","summaries.period.year","summaries.rainfall").display()
df1 = df1.withColumn("year_rainfall", F.arrays_zip("year", "rainfall"))
.withColumn("year_rainfall", F.explode("year_rainfall"))
.select("stationCode",
F.col("year_rainfall.rainfall").alias("Rainfall"),
F.col("year_rainfall.year").alias("Year"))
df1.display(20, False)
向 pyspark 介绍自己,因此我们将不胜感激一些解释或良好的信息来源
【问题讨论】:
【参考方案1】:你所拥有的对我来说看起来不错并且可读。但是你也可以直接压缩和分解:
out = (df1.select("stationCode",
F.explode(F.arrays_zip(*["summaries.period.year","summaries.rainfall"])))
.select("stationCode",F.col("col")['0'].alias("year"),F.col("col")['1'].alias("rainfall")))
out.show()
+-----------+----+--------+
|stationCode|year|rainfall|
+-----------+----+--------+
| NB001|2017| 449.0|
| NB001|2018| 352.4|
| NB001|2019| 253.2|
| NB001|2020| 283.0|
| NB001|2021| 104.2|
| NA003|2019| 58.2|
| NA003|2020| 628.2|
| NA003|2021| 120.0|
+-----------+----+--------+
【讨论】:
【参考方案2】:考虑一个包含以下数据的示例 json 文件。
"Name": "TestName",
"Date": "2021-04-09",
"Readings": [
"Id": 1,
"Reading": 5.678,
"datetime": "2021-04-09 00:00:00"
,
"Id": 2,
"Reading": 3.692,
"datetime": "2020-04-09 00:00:00"
]
定义一个我们可以强制读取数据的架构。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
data_schema = StructType(fields=[
StructField('Name', StringType(), False),
StructField('Date', StringType(), True),
StructField(
'Readings', ArrayType(
StructType([
StructField('Id', IntegerType(), False),
StructField('Reading', DoubleType(), True),
StructField('datetime', StringType(), True)
])
)
)
])
现在我们可以使用我们的架构来读取目录中的 JSON 文件
data_df = spark.read.json('/mnt/data/' + '*.json', schema=data_schema)
我们想要嵌套在“Readings”中的数据,这样我们就可以使用explode 来获取这些子列。
from pyspark.sql.functions import explode
data_df = data_df.select(
"Name",
explode("Readings").alias("ReadingsExplode")
).select("Name", "ReadingsExplode.*")
data_df.show()
这应该提供带有扁平数据框的所需输出。
【讨论】:
以上是关于在 Pyspark 中展平 Json的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 pyspark 在 aws 胶水中展平嵌套 json 中的数组?