PySpark 数据框条件按窗口/滞后

Posted

技术标签:

【中文标题】PySpark 数据框条件按窗口/滞后【英文标题】:PySpark dataframe condition by window/lag 【发布时间】:2020-11-30 12:59:00 【问题描述】:

我有一个 Spark 数据框,如下所示:

# For sake of simplicity only one user (uid) is shown, but there are multiple users 
+-------------------+-----+-------+
|start_date         |uid  |count  |
+-------------------+-----+-------+
|2020-11-26 08:30:22|user1|  4    |
|2020-11-26 10:00:00|user1|  3    |
|2020-11-22 08:37:18|user1|  3    |
|2020-11-22 13:32:30|user1|  2    |
|2020-11-20 16:04:04|user1|  2    |
|2020-11-16 12:04:04|user1|  1    |

如果用户过去至少有 count >= x 个事件,我想创建一个新的布尔列,其中的值为 True/False,标记这些带有 True 的事件。例如,对于 x=3,我希望得到:

+-------------------+-----+-------+--------------+
|start_date         |uid  |count  | marked_event |
+-------------------+-----+-------+--------------+
|2020-11-26 08:30:22|user1|  4    |  True        |
|2020-11-26 10:00:00|user1|  3    |  True        |
|2020-11-22 08:37:18|user1|  3    |  True        |
|2020-11-22 13:32:30|user1|  2    |  True        |
|2020-11-20 16:04:04|user1|  2    |  True        |
|2020-11-16 12:04:04|user1|  1    |  False       |

也就是说,对于每个计数 >= 3,我需要将该事件标记为 True,以及之前的 3 个事件。只有 user1 的最后一个事件是 False,因为我在 start_date = 2020-11-22 08:37:18 的事件之前(包括)标记了 3 个事件。

任何想法如何解决这个问题?我的直觉是以某种方式使用窗口/滞后来实现这一点,但我是新手,不知道该怎么做......


编辑:

我最终使用了@mck 解决方案的变体,并修复了一个小错误:原始解决方案具有:

F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing))

条件,无论是否满足“计数”的条件,都会将所有事件之后标记为“开始”。相反,我更改了解决方案,以便窗口仅标记“开始”之前发生的事件:

event = (f.max(f.col('begin')).over(w.rowsBetween(-2, 0))).\ 
          alias('event_post_only') 
# the number of events to mark is 3 from 'begin', 
# including the event itself, so that's -2.
df_marked_events = df_marked_events.select('*', event)

然后为所有在“event_post_only”中为真或在“event_post_only”中为真的事件标记为真

df_marked_events = df_marked_events.withColumn('event', (col('count') >= 3) \
                       | (col('event_post_only')))

这避免了将 True to everything 标记为上游 'begin' == True

【问题讨论】:

【参考方案1】:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy('uid').orderBy(F.col('count').desc(), F.col('start_date'))

# find the beginning point of >= 3 events
begin = (
    (F.col('count') >= 3) &
    (F.lead(F.col('count')).over(w) < 3)
).alias('begin')
df = df.select('*', begin)

# Mark as event if the event is in any rows after begin, or two rows before begin
event = (
    F.max(F.col('begin')).over(w.rowsBetween(0, Window.unboundedFollowing)) | 
    F.max(F.col('begin')).over(w.rowsBetween(-2,0))
).alias('event')
df = df.select('*', event)

df.show()
+-------------------+-----+-----+-----+-----+
|         start_date|  uid|count|begin|event|
+-------------------+-----+-----+-----+-----+
|2020-11-26 08:30:22|user1|  4.0|false| true|
|2020-11-22 08:37:18|user1|  3.0|false| true|
|2020-11-26 10:00:00|user1|  3.0| true| true|
|2020-11-20 16:04:04|user1|  2.0|false| true|
|2020-11-22 13:32:30|user1|  2.0|false| true|
|2020-11-16 12:04:04|user1|  1.0|false|false|
+-------------------+-----+-----+-----+-----+

【讨论】:

谢谢!!我认为它几乎完全适合我。我花了一段时间才明白 为什么如何 它是如何工作的。非常感谢! 抱歉缺少文档。希望有帮助! 我添加了一些简短的cmets,希望对其他用户有所帮助

以上是关于PySpark 数据框条件按窗口/滞后的主要内容,如果未能解决你的问题,请参考以下文章

pyspark pandas 对象作为数据框 - TypeError

带有窗口函数的 PySpark 数据偏度

Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数

使用带有窗口的多个条件按组更新列

基于另一列中的值的一列上的pyspark滞后函数

将pyspark偏移滞后动态值检索到其他数据帧