使用poseexplode分解带有索引的嵌套JSON
Posted
技术标签:
【中文标题】使用poseexplode分解带有索引的嵌套JSON【英文标题】:Explode nested JSON with Index using posexplode 【发布时间】:2020-07-05 03:06:32 【问题描述】:我正在使用下面的函数来分解深度嵌套的 JSON(具有嵌套的结构和数组)。
# Flatten nested df
def flatten_df(nested_df):
for col in nested_df.columns:
array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
for col in array_cols:
nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
if len(nested_cols) == 0:
return nested_df
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])
return flatten_df(flat_df)
我成功地爆炸了。但我还想在展开的数据框中添加元素的顺序或索引。所以在上面的代码中,我将explode_outer
函数替换为posexplode_outer
。但我收到以下错误
An error was encountered:
'The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 2 aliases'
我尝试将nested_df.withColumn
更改为nested_df.select
,但没有成功。谁能帮我分解嵌套的 json,但同时保持数组元素的顺序作为分解数据框中的一列。
【问题讨论】:
@请添加示例输入数据 【参考方案1】:将 json 数据作为数据框读取并创建视图或表。在 spark SQL 中,您可以使用别名引用的多个 laterviewexplode 方法。如果是 struct 类型的 json 数据结构,可以使用点来表示结构。 Level1.level2
【讨论】:
【参考方案2】:将nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))
替换为 nested_df = df.selectExpr("*", f"posexplode(col) as (position,col)").drop(col)
您可能需要编写一些逻辑来将列名替换为原始名称,但这应该很简单
【讨论】:
【参考方案3】:这个错误是因为posexplode_outer返回两列pos和col,所以不能和Column()一起使用。这可以在 select 中使用,如下面的代码所示
from pyspark.sql import functions as F
from pyspark.sql.window import Window
tst= sqlContext.createDataFrame([(1,7,80),(1,8,40),(1,5,100),(5,8,90),(7,6,50),(0,3,60)],schema=['col1','col2','col3'])
tst_new = tst.withColumn("arr",F.array(tst.columns))
expr = tst.columns
expr.append(F.posexplode_outer('arr'))
#%%
tst_explode = tst_new.select(*expr)
结果:
tst_explode.show()
+----+----+----+---+---+
|col1|col2|col3|pos|col|
+----+----+----+---+---+
| 1| 7| 80| 0| 1|
| 1| 7| 80| 1| 7|
| 1| 7| 80| 2| 80|
| 1| 8| 40| 0| 1|
| 1| 8| 40| 1| 8|
| 1| 8| 40| 2| 40|
| 1| 5| 100| 0| 1|
| 1| 5| 100| 1| 5|
| 1| 5| 100| 2|100|
| 5| 8| 90| 0| 5|
| 5| 8| 90| 1| 8|
| 5| 8| 90| 2| 90|
| 7| 6| 50| 0| 7|
| 7| 6| 50| 1| 6|
| 7| 6| 50| 2| 50|
| 0| 3| 60| 0| 0|
| 0| 3| 60| 1| 3|
| 0| 3| 60| 2| 60|
+----+----+----+---+---+
如果需要重命名列,可以使用 .withColumnRenamed() 函数
df_final=(tst_explode.withColumnRenamed('pos','position')).withColumnRenamed('col','column')
【讨论】:
【参考方案4】:您可以尝试 select 和 list-comprehension 来对现有代码中的 ArrayType 列进行分解:
for col in array_cols:
nested_df = nested_df.select([ F.posexplode_outer(col).alias(col+'_pos', col) if c == col else c for c in nested_df.columns ])
例子:
from pyspark.sql import functions as F
df = spark.createDataFrame([(1,"n1", ["a", "b", "c"]),(2,"n2", ["foo", "bar"])],["id", "name", "vals"])
#+---+----+----------+
#| id|name| vals|
#+---+----+----------+
#| 1| n1| [a, b, c]|
#| 2| n2|[foo, bar]|
#+---+----+----------+
col = "vals"
df.select([F.posexplode_outer(col).alias(col+'_pos', col) if c == col else c for c in df.columns]).show()
#+---+----+--------+----+
#| id|name|vals_pos|vals|
#+---+----+--------+----+
#| 1| n1| 0| a|
#| 1| n1| 1| b|
#| 1| n1| 2| c|
#| 2| n2| 0| foo|
#| 2| n2| 1| bar|
#+---+----+--------+----+
【讨论】:
以上是关于使用poseexplode分解带有索引的嵌套JSON的主要内容,如果未能解决你的问题,请参考以下文章