在 PySpark 中的窗口上获取与某些条件匹配的第一行

Posted

技术标签:

【中文标题】在 PySpark 中的窗口上获取与某些条件匹配的第一行【英文标题】:Get the first row that matches some condition over a window in PySpark 【发布时间】:2020-11-14 03:50:28 【问题描述】:

举个例子,假设我们有如下的用户操作流:

from pyspark.sql import *
spark = SparkSession.builder.appName('test').master('local[8]').getOrCreate()

df = spark.sparkContext.parallelize([
    Row(user=1, action=1, time=1),
    Row(user=1, action=1, time=2),
    Row(user=2, action=1, time=3),
    Row(user=1, action=2, time=4),
    Row(user=2, action=2, time=5),
    Row(user=2, action=2, time=6),
    Row(user=1, action=1, time=7),
    Row(user=2, action=1, time=8),
]).toDF()
df.show()

数据框如下所示:

+----+------+----+
|user|action|time|
+----+------+----+
|   1|     1|   1|
|   1|     1|   2|
|   2|     1|   3|
|   1|     2|   4|
|   2|     2|   5|
|   2|     2|   6|
|   1|     1|   7|
|   2|     1|   8|
+----+------+----+

然后,我想在每一行添加一列next_alt_time,给出用户在以下行中更改操作类型的时间。对于上面的输入,输出应该是:

+----+------+----+-------------+
|user|action|time|next_alt_time|
+----+------+----+-------------+
|   1|     1|   1|            4|
|   1|     1|   2|            4|
|   2|     1|   3|            5|
|   1|     2|   4|            7|
|   2|     2|   5|            8|
|   2|     2|   6|            8|
|   1|     1|   7|         null|
|   2|     1|   8|         null|
+----+------+----+-------------+

我知道我可以创建这样的窗口:

wnd = Window().partitionBy('user').orderBy('time').rowsBetween(1, Window.unboundedFollowing)

但是我不知道如何在窗口上施加条件并在上面定义的窗口上选择与当前行具有不同操作的第一行。

【问题讨论】:

预期输出 ?? 【参考方案1】:

这里是如何做到这一点。 Spark 无法保持数据帧的顺序,但如果您逐行检查,您可以确认它给出了您预期的答案:

from pyspark.sql import Row
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df = spark.sparkContext.parallelize([
    Row(user=1, action=1, time=1),
    Row(user=1, action=1, time=2),
    Row(user=2, action=1, time=3),
    Row(user=1, action=2, time=4),
    Row(user=2, action=2, time=5),
    Row(user=2, action=2, time=6),
    Row(user=1, action=1, time=7),
    Row(user=2, action=1, time=8),
]).toDF()

win = Window().partitionBy('user').orderBy('time')

df = df.withColumn('new_action', F.lag('action').over(win) != F.col('action'))
df = df.withColumn('new_action_time', F.when(F.col('new_action'), F.col('time')))
df = df.withColumn('next_alt_time', F.first('new_action', ignorenulls=True).over(win.rowsBetween(1, Window.unboundedFollowing)))

df.show()

+----+------+----+----------+---------------+-------------+
|user|action|time|new_action|new_action_time|next_alt_time|
+----+------+----+----------+---------------+-------------+
|   1|     1|   1|      null|           null|            4|
|   1|     1|   2|     false|           null|            4|
|   1|     2|   4|      true|              4|            7|
|   1|     1|   7|      true|              7|         null|
|   2|     1|   3|      null|           null|            5|
|   2|     2|   5|      true|              5|            8|
|   2|     2|   6|     false|           null|            8|
|   2|     1|   8|      true|              8|         null|
+----+------+----+----------+---------------+-------------+

【讨论】:

非常感谢。重新排序的数据帧实际上使计算更加清晰。我没有想过使用延迟,但现在我发现使用它来检测变化是很自然的。

以上是关于在 PySpark 中的窗口上获取与某些条件匹配的第一行的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Dataframe (Pyspark) 中提取与特定条件匹配的第一个“行集”

PySpark 中的窗口函数和条件过滤器

Pyspark - 获取具有条件的列的累积总和

当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件

在pyspark中加入2个表,多个条件,左连接?

如果存在与数据中的某些条件匹配的另一行,则从pandas DataFrame中查找行