Pyspark 将数组列分解为带有滑动窗口的子列表
Posted
技术标签:
【中文标题】Pyspark 将数组列分解为带有滑动窗口的子列表【英文标题】:Pyspark explode array column into sublist with sliding window 【发布时间】:2021-08-16 15:56:09 【问题描述】:我有一个 PySpark 行,我想根据列中的一个值将其分成更小的行。
给定一个df:
input_df = spark.createDataFrame([
(2,[1,2,3,4,5],),
], ("id", "list"))
+---+------------+
| id| list|
+---+------------+
| 2|[1, 2, 3, 4]|
+---+------------+
我想用固定大小的滑动窗口将每一行分成多个子集。结果 df 将是这样的:
output_df = spark.createDataFrame([
(2, [0,0], 1), (2, [0,1], 2), (2, [1,2], 3), (2, [2,3], 4), (2, [3,4], 5),
], ("id", "past", "future"))
+---+------+------+
| id| past|future|
+---+------+------+
| 2|[0, 0]| 1|
| 2|[0, 1]| 2|
| 2|[1, 2]| 3|
| 2|[2, 3]| 4|
| 2|[3, 4]| 5|
+---+------+------+
关于如何打破列表以使指针查看列表的每个元素的逻辑,使用前面的 N 个元素(在这种情况下 N=2)作为过去(如果没有足够的元素,则用 0 填充)并使用当前指针元素作为未来。对每个元素执行此操作会创建数据框。
我想不出用 pyspark 来做这件事的方法,我会用 pandas 数据框为每一行做一个迭代循环。有没有办法用 pyspark 做到这一点?
【问题讨论】:
【参考方案1】:处理数组列的最佳方法是使用higher-order functions。
import pyspark.sql.functions as f
output_df = (input_df
.withColumn('list', f.expr('TRANSFORM(list, (element, i) -> STRUCT(ARRAY(COALESCE(list[i - 2], 0), COALESCE(list[i - 1], 0)) AS past, element AS future))'))
.selectExpr('id', 'inline(list)'))
+---+------+------+
|id |past |future|
+---+------+------+
|2 |[0, 0]|1 |
|2 |[0, 1]|2 |
|2 |[1, 2]|3 |
|2 |[2, 3]|4 |
|2 |[3, 4]|5 |
+---+------+------+
更新
在创建表达式之前动态传递N
:
N = 2
expr = 'TRANSFORM(list, (element, i) -> STRUCT(TRANSFORM(sequence(N, 1), k -> COALESCE(list[i - k], 0)) AS past, element AS future))'.format(N=N)
output_df = (input_df
.withColumn('list', f.expr(expr))
.selectExpr('id', 'inline(list)'))
【讨论】:
嗨!非常接近我正在寻找的东西,是否有任何让它能够改变N?创建过去列的窗口大小。 是的,你想出了一个非常干净的解决方案 :) 谢谢!现在接受的答案,将在更大的数据集中尝试 tmw 早上,但应该可以工作。 很抱歉再次打扰,有没有办法将每个id
的最后一行提取到不同的数据框中?例如,这意味着将行 2, [3,4], 5
放入不同的数据帧中。
NVM!见pastebin.com/kFT5WmL3。我找到了一种方法,可以创建一个带有布尔值的新列来存储它是否是最后一行,然后我根据它进行过滤。如果您认为第一次使用这个一阶函数可能更优化,我愿意接受建议!看起来很整洁!
我建议您创建另一个问题,只是为了保留您的原始问题以及如何创建只有最后一个 id 分隔的数据框以上是关于Pyspark 将数组列分解为带有滑动窗口的子列表的主要内容,如果未能解决你的问题,请参考以下文章