使用 Spark/Scala 有效地按键分组并查找在特定时间窗口中发生的事件的上一个时间戳

Posted

技术标签:

【中文标题】使用 Spark/Scala 有效地按键分组并查找在特定时间窗口中发生的事件的上一个时间戳【英文标题】:Group by key and find the previous timestamp of an event that occured in a specific time window efficiently with Spark/Scala 【发布时间】:2018-04-28 01:10:04 【问题描述】:

注意:我的分组每个组最多可以包含 5-10K 行用于聚合。因此,非常需要高效的代码。

我的数据

val df1 = sc.parallelize(Seq(
  ("user2", "iphone", "2017-12-23 16:58:08", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:12", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:20", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:25", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:35", "Success"),
  ("user2", "iphone", "2017-12-23 16:58:45", "Success")
)).toDF("username", "device", "attempt_at", "stat")
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-23 16:58:08|Success|
|   user2|iphone|2017-12-23 16:58:12|Success|
|   user2|iphone|2017-12-23 16:58:20|Success|
|   user2|iphone|2017-12-23 16:58:25|Success|
|   user2|iphone|2017-12-23 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

我想要什么 按(用户名、设备)对事件发生的最近时间进行分组。

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

所需输出中的异常: 现在既然我提到它必须在一个特定的时间窗口中,例如在下面最后一行的输入数据集中 12 月 23 日的最新日期时间戳。现在如果我想要一个返回 1 天的特定时间窗口并给我最后一次尝试,则 'previous_attempt_at' 列将为空,因为前一天没有任何事件,应该是 1 月 22 日.这完全取决于输入时间戳范围。

//Initial Data
+--------+------+-------------------+-------+
|username|device|         attempt_at|   stat|
+--------+------+-------------------+-------+
|   user2|iphone|2017-12-20 16:58:08|Success|
|   user2|iphone|2017-12-20 16:58:12|Success|
|   user2|iphone|2017-12-20 16:58:20|Success|
|   user2|iphone|2017-12-20 16:58:25|Success|
|   user2|iphone|2017-12-20 16:58:35|Success|
|   user2|iphone|2017-12-23 16:58:45|Success|
+--------+------+-------------------+-------+

// Desired Output
A grouping by (username,device) for the latest time an event occurred.

    +--------+------+-------------------+-------+-------------------+
    |username|device|         attempt_at|   stat|previous_attempt_at|
    +--------+------+-------------------+-------+-------------------+
    |   user2|iphone|2017-12-23 16:58:45|Success|               null|
    +--------+------+-------------------+-------+-------------------+

我有什么

val w = (Window.partitionBy("username", "device")
                 .orderBy(col("attempt_at").cast("timestamp").cast("long"))
                   .rangeBetween(-3600, -1)
                 )

val df2 = df1.withColumn("previous_attempt_at", last("attempt_at").over(w))

+--------+------+-------------------+-------+-------------------+
|username|device|         attempt_at|   stat|previous_attempt_at|
+--------+------+-------------------+-------+-------------------+
|   user2|iphone|2017-12-23 16:58:08|Success|               null|
|   user2|iphone|2017-12-23 16:58:12|Success|2017-12-23 16:58:08|
|   user2|iphone|2017-12-23 16:58:20|Success|2017-12-23 16:58:12|
|   user2|iphone|2017-12-23 16:58:25|Success|2017-12-23 16:58:20|
|   user2|iphone|2017-12-23 16:58:35|Success|2017-12-23 16:58:25|
|   user2|iphone|2017-12-23 16:58:45|Success|2017-12-23 16:58:35|
+--------+------+-------------------+-------+-------------------+

备注。 我拥有的代码对特定用户分组中的每一行进行窗口化。 在处理大量数据时效率非常低,也没有给出最新的尝试。除了最后一行,我不需要所有行。

【问题讨论】:

【参考方案1】:

您只需要一个额外的groupByaggregation,但在此之前,您需要collect_list 函数来累积收集以前的日期,并需要udf 函数来检查以前的尝试_at 在时间范围内限制将三列转换"attempt_at", "stat", "previous_attempt_at")为struct选择最后一列

import org.apache.spark.sql.functions._
import java.time._
import java.time.temporal._
import java.time.format._
def durationUdf = udf((actualtimestamp: String, timestamps: Seq[String])=> 
  val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
  val actualDateTime = LocalDateTime.parse(actualtimestamp, formatter)
  val diffDates = timestamps.init.filter(x => LocalDateTime.from(LocalDateTime.parse(x, formatter)).until(actualDateTime, ChronoUnit.DAYS) <= 1)
  if(diffDates.size > 0) diffDates.last else null
)

import org.apache.spark.sql.expressions._
val w = Window.partitionBy("username", "device").orderBy(col("attempt_at").cast("timestamp").cast("long"))

val df2 = df1.withColumn("previous_attempt_at", durationUdf(col("attempt_at"), collect_list("attempt_at").over(w)))
  .withColumn("struct", struct(col("attempt_at").cast("timeStamp").as("attempt_at"),col("stat"), col("previous_attempt_at")))
  .groupBy("username", "device").agg(max("struct").as("struct"))
  .select(col("username"), col("device"), col("struct.attempt_at"), col("struct.stat"), col("struct.previous_attempt_at"))

这应该给你后面的例子

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|null               |
+--------+------+---------------------+-------+-------------------+

和下面的对于前面的输入data

+--------+------+---------------------+-------+-------------------+
|username|device|attempt_at           |stat   |previous_attempt_at|
+--------+------+---------------------+-------+-------------------+
|user2   |iphone|2017-12-23 16:58:45.0|Success|2017-12-23 16:58:35|
+--------+------+---------------------+-------+-------------------+

您可以通过将udf 函数中的ChronoUnit.DAYS 更改为ChronoUnit.HOURS 等来更改小时的逻辑,以此类推

【讨论】:

谢谢,但是时间窗口呢,假设我想回顾 1 小时、1 周等。例如对于上面的代码,如果 1-5 的所有行都发生了 2-3 小时以前,第 6 行的结果为空。如果回顾的窗口是一周,我们会得到给定的结果。 感谢@Ramesh Maharjan。请查看新编辑,它位于“所需输出下的异常”下 谢谢,请您解释一下“max(struct)”如何从数组中选择最新的时间戳行。它有效,但对我来说毫无意义。如果可以解释 UDF 中的“反向胖箭头”是如何工作的。 struct 具有三个元素,max 函数从 struct 中选择最大值。它首先检查第一个元素,如果 tie 从第二个元素中选择最大值,依此类推。这不是胖反向箭头,它小于或等于符号,( 谢谢你! .但是还有另一个问题,如果我想要每个(用户,设备)的每小时数据怎么办。所以这是最后一次尝试了。 (用户,设备)每小时/每天/每周等。这是另一个问题,还是您可以添加一些除了您拥有的东西。

以上是关于使用 Spark/Scala 有效地按键分组并查找在特定时间窗口中发生的事件的上一个时间戳的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark Scala 高效的全外连接中合并连接多个 DataFrame

如何在 Spark/Scala 中查找具有许多空值的列

无法使用 spark scala 从数据集中的行中获取第一列的值

如何在 Zeppelin/Spark/Scala 中漂亮地打印数据框?

根据每个组的 spark/scala 时间窗口查找上次发生的时间

Spark scala.collection.immutable.$colon$colon 不是字符串模式的有效外部类型