Spark:如何使用嵌套数组转置和分解列
Posted
技术标签:
【中文标题】Spark:如何使用嵌套数组转置和分解列【英文标题】:Spark: How to transpose and explode columns with nested arrays 【发布时间】:2021-10-02 15:16:53 【问题描述】:我应用了以下问题中的算法(在 NOTE 中)来转置和分解嵌套的 spark 数据帧。
当我定义 cols = ['a', 'b']
时,我得到空数据框,但是当我定义 cols = ['a']
时,我得到 a
列的转换数据框。有关更多详细信息,请参阅下面的当前代码部分。任何帮助将不胜感激。
我正在寻找所需的输出 2(Transpose 和 Explode),但即使是所需输出 1(Transpose)的示例也会非常有用。
注意:这是突出问题的最小示例,实际上数据帧架构和数组长度会有所不同,如示例 Pyspark: How to flatten nested arrays by merging values in spark
输入df:
+---+------------------+--------+
| id| a| b|
+---+------------------+--------+
| 1|[1, 1, 11, 11]| null|
| 2| null|[2, 2]|
+---+------------------+--------+
root
|-- id: long (nullable = true)
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true
所需的输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [1, 1, 11, 11]|
| 2| b | [2, 2] |
+---+------+-------------------+
所需的输出 2 (explode_df):
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
| 1| a| 1| 1|
| 1| a| 11| 11|
| 2| b| 2| 2|
+---+----+----+---+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
""""id":1,"a":["date":1,"val":1,"date":11,"val":11]""",
""""id":2,"b":["date":2,"val":2]"""]))
cols = ['a', 'b']
expressions = [f.expr('TRANSFORM(col, el -> STRUCT("col" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]
transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')
explode_df.show()
目前的成果
+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+
【问题讨论】:
【参考方案1】:对于第一步,stack 可能是比transpose
更好的选择。
expr = f"stack(len(cols)," + \
",".join([f"'c',c" for c in cols]) + \
")"
#expr = stack(2,'a',a,'b',b)
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
【讨论】:
非常感谢它适用于具有静态字段的数组(date
,val
)。如果列中的数组具有随机变化的动态字段,我又提出了一个问题。任何帮助将不胜感激。 ***.com/questions/69423246/…以上是关于Spark:如何使用嵌套数组转置和分解列的主要内容,如果未能解决你的问题,请参考以下文章