根据每个组的 spark/scala 时间窗口查找上次发生的时间
Posted
技术标签:
【中文标题】根据每个组的 spark/scala 时间窗口查找上次发生的时间【英文标题】:Find last time occured based on a time window with spark/scala for each group 【发布时间】:2018-04-25 15:26:56 【问题描述】:我想根据时间戳窗口查找特定(用户和设备)发生登录尝试的最后/上一次时间。
For example my initial dataset looks like this:
+--------+-------+-------------------+-------+
|username| device| attempt_at| stat|
+--------+-------+-------------------+-------+
| user1| pc|2018-01-02 07:44:27| failed|
| user1| pc|2018-01-02 07:44:10|Success|
| user2| iphone|2017-12-23 16:58:08|Success|
| user2| iphone|2017-12-23 16:58:30|Success|
| user2| iphone|2017-12-23 16:58:50| failed|
| user1|android|2018-01-02 07:44:37| failed|
| user1|android|2018-01-05 08:33:47| failed|
+--------+-------+-------------------+-------+
//code
val df1 = sc.parallelize(Seq(
("user1", "pc", "2018-01-02 07:44:27", "failed"),
("user1", "pc", "2018-01-02 07:44:10", "Success"),
("user2", "iphone", "2017-12-23 16:58:08", "Success"),
("user2", "iphone", "2017-12-23 16:58:30", "Success"),
("user2", "iphone", "2017-12-23 16:58:50", "failed"),
("user1", "android", "2018-01-02 07:44:37", "failed"),
("user1", "android", "2018-01-05 08:33:47", "failed")
)).toDF("username", "device", "attempt_at", "stat")
我想要什么
1 小时和 7 天的窗口,我可以在其中找到每个特定用户和设备的时间戳中的先前尝试。基本上按用户和设备分组。
例如:对于“user1”和设备“pc”,对于上面的数据集,1 小时窗口和 7 天的先前尝试将是“2018-01-02 07:44:27”。
但是从用户 1 的设备“android”开始,前 7 天的尝试将是“2018-01-02 07:44:27”,但在 1 小时窗口内没有任何尝试,因为在过去 1 小时内没有尝试来自android的user1。
预期输出数据集
// 1 hr window for last known attempt
+--------+-------+---------------------+--------------------+
|username| device| attempt_at| previous_attempt_at|
+--------+-------+---------------------+--------------------+
| user1| pc| 2018-01-02 07:44:10| 2018-01-02 07:44:27|
| user2| iphone| 2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+-------+---------------------+--------------------+
// 7 days window for last known attempt
+--------+--------+---------------------+--------------------+
|username| device | attempt_at| previous_attempt_at|
+--------+--------+---------------------+--------------------+
| user1| pc | 2018-01-02 07:44:10| 2018-01-02 07:44:27|
| user1| android| 2018-01-05 08:33:47| 2018-01-02 07:44:37|
| user2| iphone| 2017-12-23 16:58:50| 2017-12-23 16:58:30|
+--------+--------+---------------------+--------------------+
我尝试了什么:
我尝试使用“最后一个”来使用超过 1 小时的窗口。它基于窗口给出当前行的时间戳,而不是前一个。
val w = (Window.partitionBy("username", "device")
.orderBy(col("attempt_at").cast("timestamp").cast("long"))
.rangeBetween(-3600, 0)
)
val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))
【问题讨论】:
@eliasah 如果你这么认为。我当时删除了我的投票。 【参考方案1】:将.rangeBetween(-3600, 0)
替换为.rangeBetween(-3600, -1)
。
0 是 CURRENT ROW
,所以它总是最后一个。
【讨论】:
-1 是否代表距当前行 1 秒?因为 3600 秒是 1 小时。 是的,确实如此。您将数据转换为纪元时间戳。以上是关于根据每个组的 spark/scala 时间窗口查找上次发生的时间的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark Scala 为数据中的每个组选择窗口操作后的最新时间戳记录
如何对 spark scala RDD 中的元组列表/数组执行转换?