Spark SQL 滑动窗口差分计算
Posted
技术标签:
【中文标题】Spark SQL 滑动窗口差分计算【英文标题】:Spark SQL sliding window difference computation 【发布时间】:2021-02-22 13:24:18 【问题描述】:如何在 Spark 中计算滑动窗口而不使用 Spark 流?
注意:我不想在当前之前/之后使用WINDOW PARTITION BY ORDER BY k ROWS
,而是使用时间戳。 window
运算符有这样的模式:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(["a": "x", "b": "2021-02-21 01:00:00", "c": "3",
"a": "x", "b": "2021-02-21 02:00:00", "c": "4",
"a": "x", "b": "2021-02-21 03:00:00", "c": "2"])
hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time
df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)
+---+------------------------------------------+-----+
|a |window |min_c|
+---+------------------------------------------+-----+
|x |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3 |
|x |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2 |
|x |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3 |
|x |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2 |
|x |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2 |
|x |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2 |
+---+------------------------------------------+-----+
我想要的结果将为 3 个输入行中的每一个返回 3 个输出行作为 4 小时基于时间的窗口(= 状态)的滑动增量,该窗口每小时提前一小时并每小时触发一次(但是作为这是批处理,不是流式触发应该没那么重要)。
相反,我得到上面的结果,基数 > 所需的行数。
编辑
期望的输出:
输入:
x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1
输出:
x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest
【问题讨论】:
你的意思是df.groupBy(window('b', '1 hour'))
?
否:groupby a, window(4 hours, 1 hour)
即对于每个 a,我想计算 4 小时状态窗口内的最小/最大值(= 增量),但每 1 小时滑动一次。
你能显示一个想要的输出吗?
当然 - 请查看更新后的示例
就window
的行为方式而言,您的输出似乎有很大问题。具体来说,您的定义应从整点后 30 分钟 (startTime="30 minute"
) 开始生成存储桶,而您的定义则应从小时标记开始。
【参考方案1】:
恐怕您对window
表达式的假设不正确。根据其文档here:
def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
在给定时间戳指定列的情况下,将行分桶到一个或多个时间窗口中。 窗口开始是包含的,但窗口结束是排除的,例如 12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中。 ...
因此,在您的 4 小时窗口和 1 小时滑动步骤的情况下,将有 6 个桶可以应用聚合:
[2021-02-20 22:00:00, 2021-02-21 02:00:00) <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00
我不完全理解 “我不想使用 WINDOW PARTITION BY ORDER BY...”,因为这样可以让您有效地满足您的要求以得到一个每个输入的输出行计算为当前小时和前 3 小时的状态。
【讨论】:
使用WINDOW PARTITION BY ORDER BY.
不是基于时间的窗口。它只会允许访问最后 k 条记录(在 SQL 窗口中),而不考虑时间,有时更多或更少的记录会落入这个时间窗口。
有趣!我期望为每个输入记录仅触发一次窗口,并且不会以真正的流媒体方式生成中间(=滑动)记录。感谢您的澄清。
不幸的是,这是真的 - 对于WINDOW PARTITION BY ORDER BY...
,我们无法直接以基于时间的术语定义框架。以上是关于Spark SQL 滑动窗口差分计算的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL DSL 中的窗口(固定、滑动等)和水印支持
Apache Spark - 处理临时 RDD 上的滑动窗口