Spark SQL:与时间窗口聚合

Posted

技术标签:

【中文标题】Spark SQL:与时间窗口聚合【英文标题】:Spark SQL: Aggregate with time window 【发布时间】:2018-05-18 09:05:11 【问题描述】:

我有按时间戳排序的数据,结构如下:

+------------+--------+--------+----------+-------+
| timestamp  |  value | device | subgroup | group |
+------------+--------+--------+----------+-------+
| 1377986440 |      0 |      1 |        0 |     5 |
| 1377986440 |   2.25 |      1 |        0 |     5 |
| 1377986440 |      0 |      2 |        0 |     6 |
| 1377986440 |  0.135 |      0 |        0 |     6 |
| 1377986440 |  0.355 |      0 |        0 |     6 |
+------------+--------+--------+----------+-------+

我已经将时间戳从 Long 更改为 TimestampType,记录从 2013 年 9 月 1 日到 2013 年 9 月 30 日。

我需要在以下时间窗口中计算整个数据集的平均值和标准差:[00:00, 6:00), [06:00,12:00), [12:00, 18 :00)、[18:00,00:00) 和每个组。例如,输出应该是这样的:

+-------+--------------+------+-------+
| group |   timeSlot   |  avg |  std  |
+-------+--------------+------+-------+
|     0 | 00:00 6:00   |  1.4 |  0.25 |
|     0 | 06:00 12:00  |  2.4 |  0.25 |
|   ... | ...          | .... |  .... |
|     3 | 00:00 6:00   |  2.3 |   0.1 |
|     3 | 06:00 12:00  |  0.0 |   0.0 |
|   ... | ...          |  ... |   ... |
+-------+--------------+------+-------+

我尝试使用窗口解释here,所以我将我的 Unix 时间戳转换为 TimestampType,格式为 HH:mm:ss。那么我的代码是:

val data = df
  .select("*")
  .withColumn("timestamp", from_unixtime($"timestamp", "HH:mm:ss"))

 val res = data.select("*")
  .groupBy($"group", window($"timestamp", "6 hours", "6 hours"))
  .agg(avg("value"), stddev("value"))
  .orderBy("group")

但是,如果我没有指定窗口的起点,第一个时间段不是从 00:00 开始,而是从 02:00:00 开始。我得到的输出是:

+--------+------------------------------------------+---------------------+---------------------+
|group   |window                                    |avg(cons)            |stddev_samp(cons)    |
+--------+------------------------------------------+---------------------+---------------------+
|0       |[2018-05-18 02:00:00, 2018-05-18 08:00:00]|1.781448366186445E-4 |0.004346229072242386 |
|0       |[2018-05-18 14:00:00, 2018-05-18 20:00:00]|0.0045980360360061865|0.7113464184007158   |
|0       |[2018-05-18 20:00:00, 2018-05-19 02:00:00]|2.7686190915763437E-4|6.490469208721791E-4 |
|0       |[2018-05-17 20:00:00, 2018-05-18 02:00:00]|0.0016399597206953798|0.12325297254169619  |
|0       |[2018-05-18 08:00:00, 2018-05-18 14:00:00]|2.3354306613988956E-4|5.121337883543223E-4 |
|1       |[2018-05-18 20:00:00, 2018-05-19 02:00:00]|8.319111249637333E-4 |0.00163300686441327  |
|1       |[2018-05-18 14:00:00, 2018-05-18 20:00:00]|0.006463708881068344 |0.7907138759032012   |
|1       |[2018-05-18 02:00:00, 2018-05-18 08:00:00]|6.540241054052753E-4 |0.020490123866864617 |

我应该如何更改我的代码?我尝试了其他解决方案,但没有一个有效

【问题讨论】:

数据集中的第一条记录是什么(时间戳)? @Rumoku 1377986420,0,1,0,3 ;时间戳为 2013 年 9 月 1 日 00:00:20 。然而,查询也应该适用于其他数据集 【参考方案1】:

您可能错误地配置了时区设置。两小时轮班建议您使用 GMT+2 或同等标准。

如果您使用 Spark 2.3(或更高版本),您可以在代码(或配置)中设置时区:

spark.conf.set("spark.sql.session.timeZone", "UTC")

【讨论】:

以上是关于Spark SQL:与时间窗口聚合的主要内容,如果未能解决你的问题,请参考以下文章

[Py]Spark SQL:使用框架的输入行约束窗口的每一帧

Spark ML Transformer - 使用 rangeBetween 在窗口上聚合

Spark窗口函数按行中最常见的值聚合

Spark Window 聚合与 Group By/Join 性能

在spark sql中对窗口函数使用having子句的语义是什么?

窗口聚合函数与分组聚合函数的异同