给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?

Posted

技术标签:

【中文标题】给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?【英文标题】:Given events with start and end times, how to count the number of simultaneous events using Spark? 【发布时间】:2018-09-26 03:26:35 【问题描述】:

给定一个庞大的事件数据集,每个事件的开始和结束时间如下:

+------+--------------------+--------------------+
|id    |           startTime|             endTime|
+------+--------------------+--------------------+
|     1|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     2|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     3|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     4|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     5|2018-01-01 00:00:...|2018-01-01 00:00:...|
|     6|2018-01-01 00:00:...|2018-01-01 00:00:...|
+------+--------------------+--------------------+

如何计算任何给定时间同时发生的事件数?如下:

+--------------------+-----+
|                time|count|
+--------------------+-----+
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    3|
|2018-01-01 00:00:...|    2|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
|2018-01-01 00:00:...|    1|
|2018-01-01 00:00:...|    0|
+--------------------+-----+

这是针对 batch 用例,以下是使用 Windows 的尝试(希望有其他使用 Spark 的更优雅/高性能的解决方案):

case class EventWithEnd(source: String, startTime: Timestamp, endTime: Timestamp)

val eventsWithEnd: Dataset[EventWithEnd] = ...

val ws = Window.orderBy("time").rowsBetween(Long.MinValue, 0)

eventsWithEnd
    .flatMap(e => List(EventTime(e.startTime, "START"), EventTime(e.endTime, "END")))
    .orderBy(asc("time"))
    .withColumn("starts", count(when(col("eventType") === "START", true)) over ws)
    .withColumn("ends", count(when(col("eventType") === "END", true)) over ws)
    .withColumn("count", col("starts") - col("ends"))
    .drop("eventType", "starts", "ends")

【问题讨论】:

我不知道足够多的 scala 来为您提供确切的解决方案,但从概念上讲,我会这样做:1. 选择一个分辨率(比如 1 秒),2. 做平面图创建所有时间戳start 和 end 之间是分辨率的倍数,3. 按值计数。 【参考方案1】:

此解决方案使用 Dataset API:给定,Es 它计算 ECs

// event where start (s) is inclusive, end (e) is exclusive
case class E(id: String, s: Int, e: Option[Int])

object E 
  def apply(s: Int, e: Int): E = E(UUID.randomUUID.toString, s, Some(e))


//count of events at t
case class EC(t: Int, count: Int)

//transformation
implicit class EOps(es: Dataset[E]) 

    def counts(implicit spark: SparkSession): Dataset[EC] = 
      import spark.implicits._

      val bs: Dataset[B] = es
        .flatMap(e => Seq(B(e.s, "start"), B(e.e.get, "end")))

      val ts: Dataset[Int] = bs
        .map(b => b.t)
        .distinct()

      val bbs: Dataset[(Int, B)] = ts
        .joinWith(
          bs,
          ts("value") >= bs("t"),
          "left_outer")

      bbs
        .groupByKey  case (l, _) => l 
        .mapGroups  case (k, vs) =>
          val count: Int = vs
            .map  case (_, b) => b.bt 
            .foldLeft(0)  (c, v) =>
              v match 
                case "start" => c + 1
                case "end" => c - 1
              
            
          EC(k, count)
        
    
  

//Example test case
"EOps" should "return counts at time t given events" in 

    val es: Dataset[E] = Seq(E(1, 3), E(2, 4), E(3, 5))
      .toDS

    val cs: Dataset[EC] = es
      .counts

    //cs.explain()

    cs.collect() must contain theSameElementsAs
      Seq(EC(1, 1), EC(2, 2), EC(3, 2), EC(4, 1), EC(5, 0))
  

【讨论】:

以上是关于给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?的主要内容,如果未能解决你的问题,请参考以下文章

在给定日期时间连续性的情况下,Pandas 输出日期、开始和结束时间以及事件状态

是否可以匹配字符串中所有出现的模式,具有给定的开始和结束? [复制]

如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?

如何找出给定日期和开始和结束时间可用的所有持续时间:java

给定一个 Unix 时间戳,如何获得那一天的开始和结束?

使用fullCalendar将allDay设置为false时,如何动态设置事件开始和结束时间?