PySpark:如何将列表分解为具有顺序命名的多列?

Posted

技术标签:

【中文标题】PySpark:如何将列表分解为具有顺序命名的多列?【英文标题】:PySpark: How to explode list into multiple columns with sequential naming? 【发布时间】:2021-07-08 14:13:08 【问题描述】:

我有以下 PySpark DF:

+--------+--------------------+--------------------+
|      id|           resoFacts|             heating|
+--------+--------------------+--------------------+
|90179090|[, [No Handicap A...|[Central Heat, Fo...|
+--------+--------------------+--------------------+

由以下人员创建:

(data_filt
     .where(col('id') == '90179090')
     .withColumn('heating', col("resoFacts").getField('heating')))

我想创建一个 DF,将heating 中的列表扩展为按顺序命名的列,如下所示:

+--------------+------------+----------+----------+---------+
|  id          |heating_1   |heating_2 | heating_3|heating_4|
+--------------+------------+----------+----------+---------+
|  90179090    |Central Heat|Forced Air|      Gas |Heat Pump|
+--------------+------------+----------+----------+---------+

我最远的尝试产生了以下 DF:

+---+------------+----------+----+---------+
|pos|Central Heat|Forced Air| Gas|Heat Pump|
+---+------------+----------+----+---------+
|  1|        null|Forced Air|null|     null|
|  3|        null|      null| Gas|     null|
|  2|        null|      null|null|Heat Pump|
|  0|Central Heat|      null|null|     null|
+---+------------+----------+----+---------+

使用此代码:

(data_filt
     .where(col('id') == '90179090')
     .withColumn('heating', col("resoFacts").getField('heating'))
     .select("heating", posexplode("heating"))
     .groupBy('pos').pivot('col').agg(first('col')))

我可能对以groupBy 开头的行做错了。有人有想法吗?

【问题讨论】:

数组中总是有 4 个元素? 【参考方案1】:

如果数组中只有 4 个元素,则可以这样做:

from pyspark.sql import functions as F

data_filt.select(
    "id",
    *(
        F.col("heating").getItem(i).alias(f"heating_i+1")
        for i in range(4)
    )
)

如果您有更多元素,请增加 range

【讨论】:

(1) 元素的数量是可变的,并且 (2) 需要在数百万行上执行此操作(因此也应该是高性能的) @mmz 它是高性能的,比使用 group by 更高效,因为它是一个简单的 map。你最多有多少个元素?你知道吗? 最大为 10 个元素。 fwiw,这也将跨越多个其他列 @mmz 请在我的解决方案不起作用的地方添加更多详细信息,以便我可以详细说明...目前,我看不出您有任何不应该使用此解决方案的理由。 @mmz 这是一个高效的解决方案。如果您需要使解决方案更具动态性,您可以首先找到数据集的最大范围,然后将其添加到上述解决方案中,例如 for i in range(max_len)。然后数组项length < max_len 将具有max_len - length null

以上是关于PySpark:如何将列表分解为具有顺序命名的多列?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 将 JSON 字符串分解为多列

如何将 map_keys() 中的值拆分为 PySpark 中的多列

将列表的列拆分为同一 PySpark 数据框中的多列

Pyspark 将数组列分解为带有滑动窗口的子列表

具有列顺序的响应式多列列表

PySpark 中具有多列的日期算术