基于另一列的滞后窗口函数

Posted

技术标签:

【中文标题】基于另一列的滞后窗口函数【英文标题】:Window function with lag based on another column 【发布时间】:2021-03-03 18:35:42 【问题描述】:

我有以下 Spark DataFrame:

id month column_1 column_2
A 1 100 0
A 2 200 1
A 3 800 2
A 4 1500 3
A 5 1200 0
A 6 1600 1
A 7 2500 2
A 8 2800 3
A 9 3000 4

我想创建一个新列,我们根据 column_2 给出的动态滞后将其称为“dif_column1”。期望的输出是:

id month column_1 column_2 dif_column1
A 1 100 0 0
A 2 200 1 100
A 3 800 2 700
A 4 1500 3 1400
A 5 1200 0 0
A 6 1600 1 400
A 7 2500 2 1300
A 8 2800 3 1600
A 9 3000 4 1800

我曾尝试使用 lag 函数,但显然我只能将整数与 lag 函数一起使用,所以它不起作用:

w = Window.partitionBy("id")
sdf = sdf.withColumn("dif_column1", F.col("column_1") - F.lag("column_1",F.col("column_2")).over(w))

【问题讨论】:

【参考方案1】:

您可以添加一个行号列,并根据行号和column_2中定义的滞后进行自连接:

from pyspark.sql import functions as F, Window

w = Window.partitionBy("id").orderBy("month")

df1 = df.withColumn('rn', F.row_number().over(w)) 

df2 = df1.alias('t1').join(
    df1.alias('t2'),
    F.expr('(t1.id = t2.id) and (t1.rn = t2.rn + t1.column_2)'),
    'left'
).selectExpr(
    't1.*',
    't1.column_1 - t2.column_1 as dif_column1'
).drop('rn')

df2.show()
+---+-----+--------+--------+-----------+
| id|month|column_1|column_2|dif_column1|
+---+-----+--------+--------+-----------+
|  A|    1|     100|       0|          0|
|  A|    2|     200|       1|        100|
|  A|    3|     800|       2|        700|
|  A|    4|    1500|       3|       1400|
|  A|    5|    1200|       0|          0|
|  A|    6|    1600|       1|        400|
|  A|    7|    2500|       2|       1300|
|  A|    8|    2800|       3|       1600|
|  A|    9|    3000|       4|       1800|
+---+-----+--------+--------+-----------+

【讨论】:

感谢您的帮助,它确实有效。我只是想知道是否有另一种方法而不必使用连接,因为我的数据框有数百万行和数百列,并且操作很慢。 抱歉,我刚刚意识到连接条件中缺少某些内容 - 检查编辑后的答案。恐怕需要加入。我想与滞后窗口功能相比,加入并不太贵,因为我觉得它们本质上是在做同样的事情。 你可能是对的。非常感谢您的帮助,真的很有帮助!

以上是关于基于另一列的滞后窗口函数的主要内容,如果未能解决你的问题,请参考以下文章

将 FIRST_VALUE 与基于另一列的条件一起使用

使用滞后函数访问当前行值

使用窗口函数根据另一列从列中检索值

窗口函数:仅对另一列中的不同值求和

如何计算另一列中特定值的列的平均值?

SQL - 使用窗口函数创建滞后变量