Spark:如何使用嵌套数组转置和分解列

Posted

技术标签:

【中文标题】Spark:如何使用嵌套数组转置和分解列【英文标题】:Spark: How to transpose and explode columns with nested arrays 【发布时间】:2021-10-02 15:16:53 【问题描述】:

我应用了以下问题中的算法(在 NOTE 中)来转置和分解嵌套的 spark 数据帧。

当我定义 cols = ['a', 'b'] 时,我得到空数据框,但是当我定义 cols = ['a'] 时,我得到 a 列的转换数据框。有关更多详细信息,请参阅下面的当前代码部分。任何帮助将不胜感激。

我正在寻找所需的输出 2(Transpose 和 Explode),但即使是所需输出 1(Transpose)的示例也会非常有用。

注意:这是突出问题的最小示例,实际上数据帧架构和数组长度会有所不同,如示例 Pyspark: How to flatten nested arrays by merging values in spark

输入df:

+---+------------------+--------+
| id|                 a|       b|
+---+------------------+--------+
|  1|[1, 1, 11, 11]|    null|
|  2|              null|[2, 2]|
+---+------------------+--------+


root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true

所需的输出 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [1, 1, 11, 11]|
|  2|  b   | [2, 2]          |
+---+------+-------------------+

所需的输出 2 (explode_df):

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
|  1|   a|   1|  1|
|  1|   a|  11| 11|
|  2|   b|   2|  2|
+---+----+----+---+

当前代码:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """"id":1,"a":["date":1,"val":1,"date":11,"val":11]""",
  """"id":2,"b":["date":2,"val":2]"""]))

cols = ['a', 'b']

expressions = [f.expr('TRANSFORM(col, el -> STRUCT("col" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]

transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
             
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')

explode_df.show()

目前的成果

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+

【问题讨论】:

【参考方案1】:

对于第一步,stack 可能是比transpose 更好的选择。


expr = f"stack(len(cols)," + \
    ",".join([f"'c',c" for c in cols]) + \
    ")"
#expr = stack(2,'a',a,'b',b)

transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

【讨论】:

非常感谢它适用于具有静态字段的数组(dateval)。如果列中的数组具有随机变化的动态字段,我又提出了一个问题。任何帮助将不胜感激。 ***.com/questions/69423246/…

以上是关于Spark:如何使用嵌套数组转置和分解列的主要内容,如果未能解决你的问题,请参考以下文章

SQL 如何将数据转置和分组到静态列中? [复制]

转置和聚合Oracle列数据

转置和聚合 Oracle 列数据

SQL转置和添加列

如何将StructType从Spark中的json数据框分解为行而不是列

数据转置和连接,如何遵循内连接条件?