在 PySpark 中随时间窗口聚合

Posted

技术标签:

【中文标题】在 PySpark 中随时间窗口聚合【英文标题】:Aggregate over time windows in PySpark 【发布时间】:2019-11-15 07:25:37 【问题描述】:

我有一个 PySpark 数据框(比如 df),如下所示:

+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10|   11|
|name1|type1|2012-01-11|   14|
|name1|type1|2012-01-12|    2|
|name1|type3|2012-01-12|    3|
|name1|type3|2012-01-11|   55|
|name1|type1|2012-01-13|   10|
|name1|type2|2012-01-14|   11|
|name1|type2|2012-01-15|   14|
|name2|type2|2012-01-10|    2|
|name2|type2|2012-01-11|    3|
|name2|type2|2012-01-12|   55|
|name2|type1|2012-01-10|   10|
|name2|type1|2012-01-13|   55|
|name2|type1|2012-01-14|   10|
+-----+-----+----------+-----+

在上面的数据框中,对于每个 name,我想计算在 3 个连续时间戳内有多少 score 的值。例如,对于name1,我希望能够检测到在2012-01-102012-01-12 之间有5 个score 值,在2012-01-132012-01-15 之间有3 个分数值(等等) name2)。

在我的输出数据框中,我希望行数少于df 中的行数。具体来说,我希望有 ~1/3 的行数,因为我正在聚合/计数 size=3 的窗口。我仍然希望有一个时间戳列,它代表窗口的第一个条目。我希望窗口不重叠。

如何使用 PySpark 做到这一点?

这是我到目前为止所尝试的:

win = W.orderBy("timestamp").partitionBy("name").rowsBetween(0,3)
df_agg = df.groupBy( "timestamp" , F.col("name")  ).agg( F.count( F.col("score") ).over(win) )

但是,当我使用上述技术时,出现以下错误。

 org.apache.spark.sql.AnalysisException: expression '`score`' is neither present in the group by, nor is it an aggregate function.

您可以使用以下代码 sn-p 创建df(示例数据框)。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window as W

df_Stats = Row("name", "type", "timestamp", "score")

df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)

df_stat_lst = [
    df_stat1,
    df_stat2,
    df_stat3,
    df_stat4,
    df_stat5,
    df_stat6,
    df_stat7,
    df_stat8,
    df_stat9,
    df_stat10,
    df_stat11,
    df_stat12,
    df_stat13,
    df_stat14
]

df = spark.createDataFrame(df_stat_lst)

【问题讨论】:

【参考方案1】:

我已经尝试了以下,告诉我它是否是预期的输出:

from pyspark.sql.window import Window

w = Window.partitionBy("name").orderBy("timestamp").rowsBetween(0, 3)
df_agg = demo_df.withColumn("group_count", F.count("score").over(w))
df_agg.show()

# +-----+-----+----------+-----+-----------+
# | name| type| timestamp|score|group_count|
# +-----+-----+----------+-----+-----------+
# |name1|type1|2012-01-10|   11|          4|
# |name1|type1|2012-01-11|   14|          4|
# |name1|type3|2012-01-11|   55|          4|
# |name1|type1|2012-01-12|    2|          4|
# |name1|type3|2012-01-12|    3|          4|
# |name1|type1|2012-01-13|   10|          3|
# |name1|type2|2012-01-14|   11|          2|
# |name1|type2|2012-01-15|   14|          1|
# |name2|type2|2012-01-10|    2|          4|
# |name2|type1|2012-01-10|   10|          4|
# |name2|type2|2012-01-11|    3|          4|
# |name2|type2|2012-01-12|   55|          3|
# |name2|type1|2012-01-13|   55|          2|
# |name2|type1|2012-01-14|   10|          1|
# +-----+-----+----------+-----+-----------+

partitionBy 相当于 groupByWindow 函数,至少在功能方面。

【讨论】:

@pisall :感谢您尝试回答。但是,在我的输出数据框中,我希望行数更少。具体来说,我希望有 ~1/3 的行数,因为我正在聚合/计数 size=3 的窗口。我仍然希望有一个时间戳列,它代表窗口的第一个条目。我希望窗户不重叠。 @SiddharthSatpathy 您要查找的是window,这与我们定义的不同【参考方案2】:

一个选项是创建一个日期范围从数据框的较低日期(可能是 2012-01-10)开始的数据框。使用这个新数据框对您的数据框进行内部连接,以便使用您想要的日期范围获取当前数据,现在您可以使用名称、类型和时间戳创建一个组,并使用总和进行聚合。我认为这是最好的选择。您创建的数据框是使用日期范围制作的,因此不会花费太多时间。

【讨论】:

以上是关于在 PySpark 中随时间窗口聚合的主要内容,如果未能解决你的问题,请参考以下文章

在 pyspark 中聚合 5 分钟窗口

pyspark 时间序列数据的高性能滚动/窗口聚合

pyspark如何在窗口内聚合

具有组间聚合结果的 Pyspark 窗口

如何根据 PySpark 中窗口聚合的条件计算不同值?

Pyspark SQL/SQL 中的窗口和聚合函数