在 pyspark 中聚合 5 分钟窗口

Posted

技术标签:

【中文标题】在 pyspark 中聚合 5 分钟窗口【英文标题】:Aggregating on 5 minute windows in pyspark 【发布时间】:2016-10-31 11:24:19 【问题描述】:

我有以下数据框df

User | Datetime         | amount | length
A    | 2016-01-01 12:01 | 10     | 20
A    | 2016-01-01 12:03 | 6      | 10
A    | 2016-01-01 12:05 | 1      | 3
A    | 2016-01-01 12:06 | 3      | 5
B    | 2016-01-01 12:01 | 10     | 20
B    | 2016-01-01 12:02 | 8      | 20

我想有效地使用 pyspark 来聚合超过 5 分钟的时间窗口并进行一些计算 - 例如,计算每 5 分钟时间窗口每次使用的平均数量和长度 - df 将如下所示:

User | Datetime         | amount | length
A    | 2016-01-01 12:00 | 8      | 15
B    | 2016-01-01 12:00 | 2      | 4
A    | 2016-01-01 12:05 | 9      | 20

我怎样才能以最有效的方式实现这一目标? 在我使用的熊猫中:

df.groupby(['cs_username', pd.TimeGrouper('5Min')].apply(...)

【问题讨论】:

【参考方案1】:

不幸的是,在 pyspark 中,这看起来不像在 pandas 中那么酷;-) 您可以尝试将日期转换为时间戳并使用模数,例如:

import pyspark.sql.functions as F
seconds = 300
seconds_window = F.from_unixtime(F.unix_timestamp('date') - F.unix_timestamp('date') % seconds)
dataframe.withColumn('5_minutes_window', seconds_window)

然后您可以简单地按新列分组并执行请求的聚合。

【讨论】:

以上是关于在 pyspark 中聚合 5 分钟窗口的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

在 PySpark 中随时间窗口聚合

如何根据 PySpark 中窗口聚合的条件计算不同值?

Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数

pyspark 时间序列数据的高性能滚动/窗口聚合

具有组间聚合结果的 Pyspark 窗口