在 PySpark 中分解 JSON 中不存在的密钥

Posted

技术标签:

【中文标题】在 PySpark 中分解 JSON 中不存在的密钥【英文标题】:Exploding a key not present in JSON in PySpark 【发布时间】:2021-11-26 08:14:07 【问题描述】:

我有一个 JSON 输入,其中包含一个数组,该数组将按如下方式展开:

new_df = df \
    .withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"),
    )

有时输入文件可能为空或可能没有 JSON 键“x”。在这种情况下,pyspark 代码无法显示cannot resolve 'x' given input columns: []

如果输入 JSON 中不存在此键,我是否可以保留此表的所有列并将它们全部填充为 NULL?

【问题讨论】:

您可以在读取数据时提供架构,Spark 会将缺失的列设置为 NULL。 【参考方案1】:

使用定义 here 的 has_column 函数简单地检查列是否存在于 df 中。 (下面写函数供参考)

from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *


def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False


if has_column(df, "x"):
    new_df = df.withColumn("x", explode_outer(col("x"))) \
        .select(
            col("x.p").alias("xp"),
            col("x.q").alias("xq"),
            col("x.r.l.g").alias("xrlg"),
            col("x.r.m.f").alias("xrmf"),
            col("x.r.n").alias("xrn"),
            col("x.r.o").alias("xro"),
            col("x.r.s").alias("xrs"))
else:
    new_df = df.withColumn("xp", lit(None).cast("string"))
               .withColumn("xq", lit(None).cast("string"))
               .withColumn("xrlg", lit(None).cast("string"))
               .withColumn("xrmf", lit(None).cast("string"))
               .withColumn("xrn", lit(None).cast("string"))
               .withColumn("xro", lit(None).cast("string"))
               .withColumn("xrs", lit(None).cast("string")) 

如果您还想检查内部 json 键值是否存在,您可以对每一列执行如下操作:

df.withColumn(
   "xp", 
   when(
       lit(has_column(df, "x.p")),
       col("x.p")
   ).otherwise(lit(None).cast("string")))

另一种解决方案是在读取 json 文件之前提供架构,如 hristo iliev 建议的那样

schema = StructType([
    StructField("x", StructType([
        StructField("p", StringType()),
        StructField("q", StringType()),
        ....
    ]))
])

df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)

【讨论】:

感谢您的见解@Drashti Dobariya。但是,此 has_column 函数不能应用于此处的列“x”。如果这样做,将会出现如下错误:Generators are not supported when it's nested in expressions, but got: CASE WHEN true THEN generatorouter(explode(x)) ELSE CAST(NULL AS STRING) END。有没有办法将它应用到要爆炸的列上? 您可以添加您尝试过的确切代码吗? 实际上,我可以通过在您的explode 列答案中使用第一种方法和在此数组中嵌套JSON 的答案中使用第二种方法来解决它。谢谢! 嗨@DrashtiDobariya,如果可能的话,请你看看这个问题 - ***.com/questions/69475845/… 吗? 我尝试使用内联函数,但即使列不存在,它也会评估 when 条件。 .withColumn("abc", when( \ lit(has_column(df, "pqr.stu.i")), element_at("pqr.stu.i", -1)) \ .otherwise(lit(None).cast("string"))) 即使pqr.stu下没有i,也应该变成NULL了。但相反,它抛出了一个错误,提示时的后半部分也正在执行。【参考方案2】:

另一种选择是使用架构加载文件:

sqlContext.read.format('json').schema(schema_var).load(filename)

但它确实需要您在 schema_var 中提供完整的可能架构才能工作。

【讨论】:

以上是关于在 PySpark 中分解 JSON 中不存在的密钥的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 中不存在忽略路径

如果 pyspark 中不存在,则从数据中选择键列为 null

如何使用 pyspark 2.1.0 选择另一个数据框中不存在的行?

在 Pandas 中分解多个列

pyspark 数据框如果不存在则添加一列

json解析中,解析的名称key在json中不存在,会报错吗,还是返回空对象