基于另一列的滞后窗口函数
Posted
技术标签:
【中文标题】基于另一列的滞后窗口函数【英文标题】:Window function with lag based on another column 【发布时间】:2021-03-03 18:35:42 【问题描述】:我有以下 Spark DataFrame:
id | month | column_1 | column_2 |
---|---|---|---|
A | 1 | 100 | 0 |
A | 2 | 200 | 1 |
A | 3 | 800 | 2 |
A | 4 | 1500 | 3 |
A | 5 | 1200 | 0 |
A | 6 | 1600 | 1 |
A | 7 | 2500 | 2 |
A | 8 | 2800 | 3 |
A | 9 | 3000 | 4 |
我想创建一个新列,我们根据 column_2 给出的动态滞后将其称为“dif_column1”。期望的输出是:
id | month | column_1 | column_2 | dif_column1 |
---|---|---|---|---|
A | 1 | 100 | 0 | 0 |
A | 2 | 200 | 1 | 100 |
A | 3 | 800 | 2 | 700 |
A | 4 | 1500 | 3 | 1400 |
A | 5 | 1200 | 0 | 0 |
A | 6 | 1600 | 1 | 400 |
A | 7 | 2500 | 2 | 1300 |
A | 8 | 2800 | 3 | 1600 |
A | 9 | 3000 | 4 | 1800 |
我曾尝试使用 lag 函数,但显然我只能将整数与 lag 函数一起使用,所以它不起作用:
w = Window.partitionBy("id")
sdf = sdf.withColumn("dif_column1", F.col("column_1") - F.lag("column_1",F.col("column_2")).over(w))
【问题讨论】:
【参考方案1】:您可以添加一个行号列,并根据行号和column_2中定义的滞后进行自连接:
from pyspark.sql import functions as F, Window
w = Window.partitionBy("id").orderBy("month")
df1 = df.withColumn('rn', F.row_number().over(w))
df2 = df1.alias('t1').join(
df1.alias('t2'),
F.expr('(t1.id = t2.id) and (t1.rn = t2.rn + t1.column_2)'),
'left'
).selectExpr(
't1.*',
't1.column_1 - t2.column_1 as dif_column1'
).drop('rn')
df2.show()
+---+-----+--------+--------+-----------+
| id|month|column_1|column_2|dif_column1|
+---+-----+--------+--------+-----------+
| A| 1| 100| 0| 0|
| A| 2| 200| 1| 100|
| A| 3| 800| 2| 700|
| A| 4| 1500| 3| 1400|
| A| 5| 1200| 0| 0|
| A| 6| 1600| 1| 400|
| A| 7| 2500| 2| 1300|
| A| 8| 2800| 3| 1600|
| A| 9| 3000| 4| 1800|
+---+-----+--------+--------+-----------+
【讨论】:
感谢您的帮助,它确实有效。我只是想知道是否有另一种方法而不必使用连接,因为我的数据框有数百万行和数百列,并且操作很慢。 抱歉,我刚刚意识到连接条件中缺少某些内容 - 检查编辑后的答案。恐怕需要加入。我想与滞后窗口功能相比,加入并不太贵,因为我觉得它们本质上是在做同样的事情。 你可能是对的。非常感谢您的帮助,真的很有帮助!以上是关于基于另一列的滞后窗口函数的主要内容,如果未能解决你的问题,请参考以下文章