如何将 JSON 格式的数据展平为 spark 数据框

Posted

技术标签:

【中文标题】如何将 JSON 格式的数据展平为 spark 数据框【英文标题】:How to flatten the JSON format data into spark dataframe 【发布时间】:2021-09-23 07:02:48 【问题描述】:

我正在尝试将 2 级嵌套 json 转换为 pyspark 数据框。 下面是我的 JSON 架构的样子:

在转换为产品结构的火花数据帧时,我总是得到空值,这是嵌套 JSON 的最后一级。

【问题讨论】:

这能回答你的问题吗? How to flatten a struct in a Spark dataframe? 我使用了一个函数,它可以使多级嵌套 json 变平。但出现错误:'ValueError: field rData: Length of object (1) does not match with length of fields (3)' 请分享您的代码和数据示例? 【参考方案1】:

您是否尝试过强制架构?

您可以试试这个,因为显然每个文件中都有不同的架构,因此执行正确的架构应该可以解决您的问题:

from pyspark.sql import types as T


schema = T.StructType(
    [
        T.StructField("b_key", T.IntegerType()),
        T.StructField("b_code", T.StringType()),
        T.StructField(
            "r_date",
            T.StructType(
                [
                    T.StructField("s_key", T.IntegerType()),
                    T.StructField("s_code", T.StringType()),
                    T.StructField(
                        "products",
                        T.StructType(
                            [
                                T.StructField("s_key", T.IntegerType()),
                                T.StructField("s_key", T.IntegerType()),
                                T.StructField("s_code", T.StringType()),
                                T.StructField("s_type", T.StringType()),
                                T.StructField("r_type", T.StringType()),
                                T.StructField("sl", T.DecimalType()),
                                T.StructField("sp", T.IntegerType()),
                            ]
                        ),
                    ),
                ]
            ),
        ),
    ]
)


df = spark.read.json("path/to/file.json", schema=schema)

从那里,你没有任何数组,所以你可以简单地select 嵌套列来展平。例如:

df.selct(
    "r_data.*"
)

这将使 r_data 结构列变平,最终得到 3 列。

【讨论】:

【参考方案2】:

如果结构如描述中所示是固定的,那么试试这个:

df.select(df.col("b_Code"), df.col("b_Key"),df.col("r_data.s_key"), df.col("r_data.s_Code"), df.col("r_data.products.s_key"), df.col("r_data.products.s_Code"), df.col("r_data.products.s_Type"), df.col("r_data.products.r_type"), df.col("r_data.products.sl"), df.col("r_data.products.sp"))

这里是一个函数,它可以使嵌套的 df 变平,而与 json 中的嵌套级别无关


from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

【讨论】:

试过这个功能。但出现错误:'ValueError: field rData: Length of object (1) does not match with length of fields (3)' 检查编辑的答案。错误似乎也是由于 json 损坏造成的。

以上是关于如何将 JSON 格式的数据展平为 spark 数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Pandas 或 Spark Dataframe 展平嵌套的 Excel 数据?

使用具有相同名称的嵌套子属性展平 Spark JSON 数据框

如何展平具有大量子值的 Json 数据

如何将 JSON 格式的单行 Spark 数据框分解为多行?

如何展平多个嵌套的 json 并转换为数据框?

我应该在 Rails 应用程序的哪个位置放置将不同 JSON 字符串展平为类友好格式的功能?