如何将 JSON 格式的数据展平为 spark 数据框
Posted
技术标签:
【中文标题】如何将 JSON 格式的数据展平为 spark 数据框【英文标题】:How to flatten the JSON format data into spark dataframe 【发布时间】:2021-09-23 07:02:48 【问题描述】:我正在尝试将 2 级嵌套 json 转换为 pyspark 数据框。 下面是我的 JSON 架构的样子:
在转换为产品结构的火花数据帧时,我总是得到空值,这是嵌套 JSON 的最后一级。
【问题讨论】:
这能回答你的问题吗? How to flatten a struct in a Spark dataframe? 我使用了一个函数,它可以使多级嵌套 json 变平。但出现错误:'ValueError: field rData: Length of object (1) does not match with length of fields (3)' 请分享您的代码和数据示例? 【参考方案1】:您是否尝试过强制架构?
您可以试试这个,因为显然每个文件中都有不同的架构,因此执行正确的架构应该可以解决您的问题:
from pyspark.sql import types as T
schema = T.StructType(
[
T.StructField("b_key", T.IntegerType()),
T.StructField("b_code", T.StringType()),
T.StructField(
"r_date",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField(
"products",
T.StructType(
[
T.StructField("s_key", T.IntegerType()),
T.StructField("s_key", T.IntegerType()),
T.StructField("s_code", T.StringType()),
T.StructField("s_type", T.StringType()),
T.StructField("r_type", T.StringType()),
T.StructField("sl", T.DecimalType()),
T.StructField("sp", T.IntegerType()),
]
),
),
]
),
),
]
)
df = spark.read.json("path/to/file.json", schema=schema)
从那里,你没有任何数组,所以你可以简单地select
嵌套列来展平。例如:
df.selct(
"r_data.*"
)
这将使 r_data 结构列变平,最终得到 3 列。
【讨论】:
【参考方案2】:如果结构如描述中所示是固定的,那么试试这个:
df.select(df.col("b_Code"), df.col("b_Key"),df.col("r_data.s_key"), df.col("r_data.s_Code"), df.col("r_data.products.s_key"), df.col("r_data.products.s_Code"), df.col("r_data.products.s_Type"), df.col("r_data.products.r_type"), df.col("r_data.products.sl"), df.col("r_data.products.sp"))
这里是一个函数,它可以使嵌套的 df 变平,而与 json 中的嵌套级别无关
from pyspark.sql.functions import col
def flatten_df(nested_df):
stack = [((), nested_df)]
columns = []
while len(stack) > 0:
parents, df = stack.pop()
flat_cols = [
col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
for c in df.dtypes
if c[1][:6] != "struct"
]
nested_cols = [
c[0]
for c in df.dtypes
if c[1][:6] == "struct"
]
columns.extend(flat_cols)
for nested_col in nested_cols:
projected_df = df.select(nested_col + ".*")
stack.append((parents + (nested_col,), projected_df))
return nested_df.select(columns)
【讨论】:
试过这个功能。但出现错误:'ValueError: field rData: Length of object (1) does not match with length of fields (3)' 检查编辑的答案。错误似乎也是由于 json 损坏造成的。以上是关于如何将 JSON 格式的数据展平为 spark 数据框的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Pandas 或 Spark Dataframe 展平嵌套的 Excel 数据?
使用具有相同名称的嵌套子属性展平 Spark JSON 数据框