有条件的 PySpark 窗口

Posted

技术标签:

【中文标题】有条件的 PySpark 窗口【英文标题】:PySpark window with condition 【发布时间】:2021-02-15 12:16:31 【问题描述】:

我有一个包含应用程序日志的数据集,显示某个应用程序的启动或关闭时间。有时,相关事件可能完全从日志中丢失。我想将每个应用程序的开头与相关的结束事件(如果存在)相匹配。

这是一个说明性数据集:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([['Group1', 'Logon', 'Name1', '2021-02-05T19:03:00.000+0000'],
                            ['Group1', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
                            ['Group1', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
                            ['Group1', 'End', 'Name1', '2021-02-05T19:06:00.000+0000'],
                            ['Group1', 'End', 'Name3', '2021-02-05T19:06:01.000+0000'],
                            ['Group1', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ['Group2', 'Start', 'Name1', '2021-02-05T19:04:00.000+0000'],
                            ['Group2', 'Start', 'Name1', '2021-02-05T19:05:00.000+0000'],
                            ['Group2', 'Start', 'Name2', '2021-02-05T19:06:00.000+0000'],
                            ['Group2', 'End', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ['Group2', 'Close', 'Name1', '2021-02-05T19:07:00.000+0000'],
                            ], ['group', 'type', 'name', 'time'])

df = df.withColumn('time', F.col('time').cast('timestamp'))

对于每个组,我想为每个“开始”和“结束”事件添加一个通用标识符,如果它们具有相同的“名称”。换句话说,对于每个“开始”事件,我想找到第一个尚未与另一个“开始”事件匹配的“结束”事件

预期的结果可能如下图所示:

我不介意标识符(即“my_group”)是 ID、时间戳还是跨组单调递增。我只是希望能够匹配每个组内的相关事件。

我的尝试

我考虑使用窗口函数来识别“开始”事件的结束时间和“结束”事件的开始时间。但是,我不能仅限于搜索“结束”事件(分别是“开始”事件)。此外,我无法应用上述查找尚未与另一个“开始”事件匹配的第一个“结束”事件的逻辑。

这是我的代码:

app_session_window_down = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(1, Window.unboundedFollowing) #search in the future
app_session_window_up = Window.partitionBy('group', "name").orderBy(F.col("time").cast('long')).rangeBetween(Window.unboundedPreceding, -1) #search in the past

df = df.withColumn("app_time_end", F.when((F.col("type") == 'Start'), F.first(F.col('time'), ignorenulls=True).over(app_session_window_down)).otherwise(F.lit('None')))\
    .withColumn("app_time_start", F.when((F.col("type") == 'End'), F.last(F.col('time'), ignorenulls=True).over(app_session_window_up)).otherwise(F.col('app_time_end')))

给出:

这与我想要实现的目标相去甚远。有什么提示吗?

【问题讨论】:

【参考方案1】:

内联 cmets 中有解释:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'my_group',    # the column you wanted
    F.when(
        F.col('type').isin(['Start', 'End']),
        F.row_number().over(Window.partitionBy('group', 'name', 'type').orderBy('time'))
    )
).withColumn(
    'max_group',    # helper column: get maximum row_number for each group ; will be used later
    F.least(
        F.max(
            F.when(
                F.col('type') == 'Start', F.col('my_group')
            ).otherwise(0)
        ).over(Window.partitionBy('group', 'name')),
        F.max(
            F.when(
                F.col('type') == 'End', F.col('my_group')
            ).otherwise(0)
        ).over(Window.partitionBy('group', 'name'))
    )
).withColumn(
    'my_group',    # mask the rows which don't have corresponding 'start'/'end'
    F.when(
        F.col('my_group') <= F.col('max_group'),
        F.col('my_group')
    )
).withColumn(
    'my_group',    # add the group name
    F.when(F.col('my_group').isNotNull(), F.concat_ws('_', 'group', 'name', 'my_group'))
).drop('max_group').orderBy('group', 'time')
df2.show()
+------+-----+-----+-------------------+--------------+
| group| type| name|               time|      my_group|
+------+-----+-----+-------------------+--------------+
|Group1|Logon|Name1|2021-02-05 19:03:00|          null|
|Group1|Start|Name1|2021-02-05 19:04:00|Group1_Name1_1|
|Group1|Start|Name1|2021-02-05 19:05:00|Group1_Name1_2|
|Group1|  End|Name1|2021-02-05 19:06:00|Group1_Name1_1|
|Group1|  End|Name3|2021-02-05 19:06:01|          null|
|Group1|  End|Name1|2021-02-05 19:07:00|Group1_Name1_2|
|Group2|Start|Name1|2021-02-05 19:04:00|Group2_Name1_1|
|Group2|Start|Name1|2021-02-05 19:05:00|          null|
|Group2|Start|Name2|2021-02-05 19:06:00|          null|
|Group2|  End|Name1|2021-02-05 19:07:00|Group2_Name1_1|
|Group2|Close|Name1|2021-02-05 19:07:00|          null|
+------+-----+-----+-------------------+--------------+

【讨论】:

谢谢@mck。如果我在 group1 中添加新行,则会关闭但无法按预期工作。我已经更新了上面的输入 DF。 如何确保匹配的 'End' 事件总是在 'Start' 事件之后?

以上是关于有条件的 PySpark 窗口的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 中的窗口函数和条件过滤器

在 PySpark 中的窗口上获取与某些条件匹配的第一行

如何根据 PySpark 中窗口聚合的条件计算不同值?

Pyspark 窗口函数,具有对旅行者数量进行取整的条件

PySpark 窗口函数标记满足特定条件的每个分区的第一行

当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件