在 PySpark 中分解 JSON 中不存在的密钥
Posted
技术标签:
【中文标题】在 PySpark 中分解 JSON 中不存在的密钥【英文标题】:Exploding a key not present in JSON in PySpark 【发布时间】:2021-11-26 08:14:07 【问题描述】:我有一个 JSON 输入,其中包含一个数组,该数组将按如下方式展开:
new_df = df \
.withColumn("x", explode_outer(col("x"))) \
.select(
col("x.p").alias("xp"),
col("x.q").alias("xq"),
col("x.r.l.g").alias("xrlg"),
col("x.r.m.f").alias("xrmf"),
col("x.r.n").alias("xrn"),
col("x.r.o").alias("xro"),
col("x.r.s").alias("xrs"),
)
有时输入文件可能为空或可能没有 JSON 键“x”。在这种情况下,pyspark 代码无法显示cannot resolve 'x' given input columns: []
。
如果输入 JSON 中不存在此键,我是否可以保留此表的所有列并将它们全部填充为 NULL?
【问题讨论】:
您可以在读取数据时提供架构,Spark 会将缺失的列设置为 NULL。 【参考方案1】:使用定义 here 的 has_column 函数简单地检查列是否存在于 df 中。 (下面写函数供参考)
from pyspark.sql.functions import lit, col, when
from pyspark.sql.types import *
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
if has_column(df, "x"):
new_df = df.withColumn("x", explode_outer(col("x"))) \
.select(
col("x.p").alias("xp"),
col("x.q").alias("xq"),
col("x.r.l.g").alias("xrlg"),
col("x.r.m.f").alias("xrmf"),
col("x.r.n").alias("xrn"),
col("x.r.o").alias("xro"),
col("x.r.s").alias("xrs"))
else:
new_df = df.withColumn("xp", lit(None).cast("string"))
.withColumn("xq", lit(None).cast("string"))
.withColumn("xrlg", lit(None).cast("string"))
.withColumn("xrmf", lit(None).cast("string"))
.withColumn("xrn", lit(None).cast("string"))
.withColumn("xro", lit(None).cast("string"))
.withColumn("xrs", lit(None).cast("string"))
如果您还想检查内部 json 键值是否存在,您可以对每一列执行如下操作:
df.withColumn(
"xp",
when(
lit(has_column(df, "x.p")),
col("x.p")
).otherwise(lit(None).cast("string")))
另一种解决方案是在读取 json 文件之前提供架构,如 hristo iliev 建议的那样
schema = StructType([
StructField("x", StructType([
StructField("p", StringType()),
StructField("q", StringType()),
....
]))
])
df_record = spark.read.schema(schema).json("path/to/file.JSON",multiLine=True)
【讨论】:
感谢您的见解@Drashti Dobariya。但是,此 has_column 函数不能应用于此处的列“x”。如果这样做,将会出现如下错误:Generators are not supported when it's nested in expressions, but got: CASE WHEN true THEN generatorouter(explode(x)) ELSE CAST(NULL AS STRING) END
。有没有办法将它应用到要爆炸的列上?
您可以添加您尝试过的确切代码吗?
实际上,我可以通过在您的explode 列答案中使用第一种方法和在此数组中嵌套JSON 的答案中使用第二种方法来解决它。谢谢!
嗨@DrashtiDobariya,如果可能的话,请你看看这个问题 - ***.com/questions/69475845/… 吗?
我尝试使用内联函数,但即使列不存在,它也会评估 when 条件。 .withColumn("abc", when( \ lit(has_column(df, "pqr.stu.i")), element_at("pqr.stu.i", -1)) \ .otherwise(lit(None).cast("string")))
即使pqr.stu下没有i,也应该变成NULL了。但相反,它抛出了一个错误,提示时的后半部分也正在执行。【参考方案2】:
另一种选择是使用架构加载文件:
sqlContext.read.format('json').schema(schema_var).load(filename)
但它确实需要您在 schema_var 中提供完整的可能架构才能工作。
【讨论】:
以上是关于在 PySpark 中分解 JSON 中不存在的密钥的主要内容,如果未能解决你的问题,请参考以下文章
如果 pyspark 中不存在,则从数据中选择键列为 null