使用 Spark Scala 为数据中的每个组选择窗口操作后的最新时间戳记录

Posted

技术标签:

【中文标题】使用 Spark Scala 为数据中的每个组选择窗口操作后的最新时间戳记录【英文标题】:Select latest timestamp record after a window operation for every group in the data with Spark Scala 【发布时间】:2018-04-29 00:24:32 【问题描述】:

我在一天的时间窗口 (86400) 内对(用户、应用程序)的尝试进行了计数。我想用 latest timestamp with the count 提取行并删除不必要的先前计数。确保您的答案考虑了时间窗口。一个拥有 1 台设备的用户可以在一天或一周内进行多次尝试,我希望能够在每个特定窗口中检索那些具有最终计数的特定时刻。

我的初始数据集是这样的:

val df = sc.parallelize(Seq(
  ("user1", "iphone", "2017-12-22 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-22 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-22 09:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 10:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 11:15:12",  "failed"),
  ("user1", "iphone", "2017-12-20 12:06:18", "Success"),
  ("user1", "iphone", "2017-12-20 09:15:12",  "failed"),
  ("user1", "android", "2017-12-20 09:25:20", "Success"),
  ("user1", "android", "2017-12-20 09:44:22", "Success"),
  ("user1", "android", "2017-12-20 09:58:22", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:20", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:25", "Success"),
  ("user1", "iphone", "2017-12-20 16:44:35", "Success")
)).toDF("username", "device", "date_time", "status")

我运行的代码和得到的结果

// Basically I'm looking 1 day which is 86400 seconds
val w1 = Window.partitionBy("username", "device")
               .orderBy(col("date_time").cast("date_time").cast("long").desc)
               .rangeBetween(-86400, 0) 


val countEveryAttemptDF = df.withColumn("attempts", count("device").over(w1))

现在我有

// countEveryAttemptDF.show
+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|   user1|       android|  2017-12-20 09:58:22|Success|       1|
|   user1|       android|  2017-12-20 09:44:22|Success|       2|
|   user1|       android|  2017-12-20 09:25:20|Success|       3|
|   user1|        iphone|  2017-12-22 12:06:18|Success|       1|
|   user1|        iphone|  2017-12-22 11:15:12| failed|       2|
|   user1|        iphone|  2017-12-22 10:06:18|Success|       3|
|   user1|        iphone|  2017-12-22 09:15:12| failed|       4|
|   user1|        iphone|  2017-12-20 16:44:35|Success|       1|
|   user1|        iphone|  2017-12-20 16:44:25|Success|       2|
|   user1|        iphone|  2017-12-20 16:44:20|Success|       3|
|   user1|        iphone|  2017-12-20 12:06:18|Success|       4|
|   user1|        iphone|  2017-12-20 11:15:12| failed|       5|
|   user1|        iphone|  2017-12-20 10:06:18|Success|       6|
|   user1|        iphone|  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+

我想要什么。 因此,通过确保我在同一时间窗口中,我想要最新的时间戳及其计数。

+--------+--------------+---------------------+-------+--------+
|username|.       device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|  user1     |       android    |  2017-12-20 09:25:20|Success|       3|
|  user1     |        iphone    |  2017-12-22 09:15:12| failed|       4|
|  user1     |        iphone    |  2017-12-20 09:15:12| failed|       7|
+--------+--------------+---------------------+-------+--------+**

【问题讨论】:

如果你只partitonBy "username", "application_id" 你不会得到你得到的输出。您也应该使用 transaction_date_time 的日期值来获取您得到的输出。不是这样吗? How to select the first row of each group?的可能重复 【参考方案1】:

你快到了。您已经通过查看一天的范围计算出了计数。现在你所要做的就是找出这一天范围内的最新记录,这可以通过在同一窗口函数上使用 last 来完成,但范围相反

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

def day(x: Int) = x * 86400

val w1 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(-day(1), 0)
val w2 = Window.partitionBy("username", "device")
  .orderBy(col("date_time").cast("timestamp").cast("long").desc)
  .rangeBetween(0, day(1))

val countEveryAttemptDF = df.withColumn("attempts", count("application_id").over(w1))
                            .withColumn("att", last("attempts").over(w2))
                            .filter(col("attempts") === col("att"))
                            .drop("att")

这应该给你

+--------+--------------+---------------------+-------+--------+
|username|        device|            date_time| status|attempts|
+--------+--------------+---------------------+-------+--------+
|user1   |android       |2017-12-20 09:25:20  |Success|3       |
|user1   |iphone        |2017-12-22 09:15:12  | Failed|4       |
|user1   |iphone        |2017-12-20 09:15:12  | Failed|7       |
+--------+--------------+---------------------+-------+--------+

类似于下面的 cmets 中所述

1 天有 86400 秒。我想回顾1天。同样,3600 秒是 1 小时。 1 周 604,800 秒

您可以将星期功能更改为如下所示的小时和周,并在窗口中使用它们rangeBetween

def hour(x: Int) = x * 3600
def week(x: Int) = x * 604800

希望回答对你有帮助

【讨论】:

等等,你如何定义 1 小时/1 周的时间窗口?它需要模块化。你能解释一下 Long.MinValue 和 0 是什么意思吗? 1天有86400秒。我想回顾1天。同样,3600 秒是 1 小时。 1 周 604,800 秒。我希望能够像这样用数据及时回顾。如果你的代码做到了 lmk @data-maniac,感谢您的澄清。 :) 我已经更新了我的答案,我想我已经完全按照你的意图回答了

以上是关于使用 Spark Scala 为数据中的每个组选择窗口操作后的最新时间戳记录的主要内容,如果未能解决你的问题,请参考以下文章

为spark scala中的数据框中的每个组采样不同数量的随机行

Spark Scala 聚合组 Dataframe

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

Spark Scala - 如何为每个组创建新列,然后在 spark 数据框中分解列值

Scala中的Spark分组映射UDF

如何对 spark scala RDD 中的元组列表/数组执行转换?