在 PySpark 中随时间窗口聚合
Posted
技术标签:
【中文标题】在 PySpark 中随时间窗口聚合【英文标题】:Aggregate over time windows in PySpark 【发布时间】:2019-11-15 07:25:37 【问题描述】:我有一个 PySpark 数据框(比如 df
),如下所示:
+-----+-----+----------+-----+
| name| type| timestamp|score|
+-----+-----+----------+-----+
|name1|type1|2012-01-10| 11|
|name1|type1|2012-01-11| 14|
|name1|type1|2012-01-12| 2|
|name1|type3|2012-01-12| 3|
|name1|type3|2012-01-11| 55|
|name1|type1|2012-01-13| 10|
|name1|type2|2012-01-14| 11|
|name1|type2|2012-01-15| 14|
|name2|type2|2012-01-10| 2|
|name2|type2|2012-01-11| 3|
|name2|type2|2012-01-12| 55|
|name2|type1|2012-01-10| 10|
|name2|type1|2012-01-13| 55|
|name2|type1|2012-01-14| 10|
+-----+-----+----------+-----+
在上面的数据框中,对于每个 name
,我想计算在 3 个连续时间戳内有多少 score
的值。例如,对于name1
,我希望能够检测到在2012-01-10
到2012-01-12
之间有5 个score
值,在2012-01-13
到2012-01-15
之间有3 个分数值(等等) name2
)。
在我的输出数据框中,我希望行数少于df
中的行数。具体来说,我希望有 ~1/3 的行数,因为我正在聚合/计数 size=3 的窗口。我仍然希望有一个时间戳列,它代表窗口的第一个条目。我希望窗口不重叠。
如何使用 PySpark 做到这一点?
这是我到目前为止所尝试的:
win = W.orderBy("timestamp").partitionBy("name").rowsBetween(0,3)
df_agg = df.groupBy( "timestamp" , F.col("name") ).agg( F.count( F.col("score") ).over(win) )
但是,当我使用上述技术时,出现以下错误。
org.apache.spark.sql.AnalysisException: expression '`score`' is neither present in the group by, nor is it an aggregate function.
您可以使用以下代码 sn-p 创建df
(示例数据框)。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df_Stats = Row("name", "type", "timestamp", "score")
df_stat1 = df_Stats("name1", "type1", "2012-01-10", 11)
df_stat2 = df_Stats("name1", "type1", "2012-01-11", 14)
df_stat3 = df_Stats("name1", "type1", "2012-01-12", 2)
df_stat4 = df_Stats("name1", "type3", "2012-01-12", 3)
df_stat5 = df_Stats("name1", "type3", "2012-01-11", 55)
df_stat6 = df_Stats("name1", "type1", "2012-01-13", 10)
df_stat7 = df_Stats("name1", "type2", "2012-01-14", 11)
df_stat8 = df_Stats("name1", "type2", "2012-01-15", 14)
df_stat9 = df_Stats("name2", "type2", "2012-01-10", 2)
df_stat10 = df_Stats("name2", "type2", "2012-01-11", 3)
df_stat11 = df_Stats("name2", "type2", "2012-01-12", 55)
df_stat12 = df_Stats("name2", "type1", "2012-01-10", 10)
df_stat13 = df_Stats("name2", "type1", "2012-01-13", 55)
df_stat14 = df_Stats("name2", "type1", "2012-01-14", 10)
df_stat_lst = [
df_stat1,
df_stat2,
df_stat3,
df_stat4,
df_stat5,
df_stat6,
df_stat7,
df_stat8,
df_stat9,
df_stat10,
df_stat11,
df_stat12,
df_stat13,
df_stat14
]
df = spark.createDataFrame(df_stat_lst)
【问题讨论】:
【参考方案1】:我已经尝试了以下,告诉我它是否是预期的输出:
from pyspark.sql.window import Window
w = Window.partitionBy("name").orderBy("timestamp").rowsBetween(0, 3)
df_agg = demo_df.withColumn("group_count", F.count("score").over(w))
df_agg.show()
# +-----+-----+----------+-----+-----------+
# | name| type| timestamp|score|group_count|
# +-----+-----+----------+-----+-----------+
# |name1|type1|2012-01-10| 11| 4|
# |name1|type1|2012-01-11| 14| 4|
# |name1|type3|2012-01-11| 55| 4|
# |name1|type1|2012-01-12| 2| 4|
# |name1|type3|2012-01-12| 3| 4|
# |name1|type1|2012-01-13| 10| 3|
# |name1|type2|2012-01-14| 11| 2|
# |name1|type2|2012-01-15| 14| 1|
# |name2|type2|2012-01-10| 2| 4|
# |name2|type1|2012-01-10| 10| 4|
# |name2|type2|2012-01-11| 3| 4|
# |name2|type2|2012-01-12| 55| 3|
# |name2|type1|2012-01-13| 55| 2|
# |name2|type1|2012-01-14| 10| 1|
# +-----+-----+----------+-----+-----------+
partitionBy
相当于 groupBy
的 Window
函数,至少在功能方面。
【讨论】:
@pisall :感谢您尝试回答。但是,在我的输出数据框中,我希望行数更少。具体来说,我希望有 ~1/3 的行数,因为我正在聚合/计数 size=3 的窗口。我仍然希望有一个时间戳列,它代表窗口的第一个条目。我希望窗户不重叠。 @SiddharthSatpathy 您要查找的是window,这与我们定义的不同【参考方案2】:一个选项是创建一个日期范围从数据框的较低日期(可能是 2012-01-10)开始的数据框。使用这个新数据框对您的数据框进行内部连接,以便使用您想要的日期范围获取当前数据,现在您可以使用名称、类型和时间戳创建一个组,并使用总和进行聚合。我认为这是最好的选择。您创建的数据框是使用日期范围制作的,因此不会花费太多时间。
【讨论】:
以上是关于在 PySpark 中随时间窗口聚合的主要内容,如果未能解决你的问题,请参考以下文章