Pyspark 将数组列分解为带有滑动窗口的子列表

Posted

技术标签:

【中文标题】Pyspark 将数组列分解为带有滑动窗口的子列表【英文标题】:Pyspark explode array column into sublist with sliding window 【发布时间】:2021-08-16 15:56:09 【问题描述】:

我有一个 PySpark 行,我想根据列中的一个值将其分成更小的行。

给定一个df:

input_df = spark.createDataFrame([
    (2,[1,2,3,4,5],),
    ], ("id", "list"))

+---+------------+
| id|        list|
+---+------------+
|  2|[1, 2, 3, 4]|
+---+------------+

我想用固定大小的滑动窗口将每一行分成多个子集。结果 df 将是这样的:

output_df = spark.createDataFrame([
    (2, [0,0], 1), (2, [0,1], 2), (2, [1,2], 3), (2, [2,3], 4), (2, [3,4], 5),
    ], ("id", "past", "future"))

+---+------+------+
| id|  past|future|
+---+------+------+
|  2|[0, 0]|     1|
|  2|[0, 1]|     2|
|  2|[1, 2]|     3|
|  2|[2, 3]|     4|
|  2|[3, 4]|     5|
+---+------+------+

关于如何打破列表以使指针查看列表的每个元素的逻辑,使用前面的 N 个元素(在这种情况下 N=2)作为过去(如果没有足够的元素,则用 0 填充)并使用当前指针元素作为未来。对每个元素执行此操作会创建数据框。

我想不出用 pyspark 来做这件事的方法,我会用 pandas 数据框为每一行做一个迭代循环。有没有办法用 pyspark 做到这一点?

【问题讨论】:

【参考方案1】:

处理数组列的最佳方法是使用higher-order functions。

import pyspark.sql.functions as f

output_df = (input_df
             .withColumn('list', f.expr('TRANSFORM(list, (element, i) -> STRUCT(ARRAY(COALESCE(list[i - 2], 0), COALESCE(list[i - 1], 0)) AS past, element AS future))'))
             .selectExpr('id', 'inline(list)'))

+---+------+------+
|id |past  |future|
+---+------+------+
|2  |[0, 0]|1     |
|2  |[0, 1]|2     |
|2  |[1, 2]|3     |
|2  |[2, 3]|4     |
|2  |[3, 4]|5     |
+---+------+------+

更新

在创建表达式之前动态传递N

N = 2

expr = 'TRANSFORM(list, (element, i) -> STRUCT(TRANSFORM(sequence(N, 1), k -> COALESCE(list[i - k], 0)) AS past, element AS future))'.format(N=N)
output_df = (input_df
             .withColumn('list', f.expr(expr))
             .selectExpr('id', 'inline(list)'))

【讨论】:

嗨!非常接近我正在寻找的东西,是否有任何让它能够改变N?创建过去列的窗口大小。 是的,你想出了一个非常干净的解决方案 :) 谢谢!现在接受的答案,将在更大的数据集中尝试 tmw 早上,但应该可以工作。 很抱歉再次打扰,有没有办法将每个id 的最后一行提取到不同的数据框中?例如,这意味着将行 2, [3,4], 5 放入不同的数据帧中。 NVM!见pastebin.com/kFT5WmL3。我找到了一种方法,可以创建一个带有布尔值的新列来存储它是否是最后一行,然后我根据它进行过滤。如果您认为第一次使用这个一阶函数可能更优化,我愿意接受建议!看起来很整洁! 我建议您创建另一个问题,只是为了保留您的原始问题以及如何创建只有最后一个 id 分隔的数据框

以上是关于Pyspark 将数组列分解为带有滑动窗口的子列表的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:将多个数组列拆分为行

PySpark:如何将列表分解为具有顺序命名的多列?

PySpark 根据名称将列表分解为多列

将列表转换为pyspark中的数据框列

PySpark 2.2中数组列的每个元素的子串

pyspark:将结构分解成列