给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?
Posted
技术标签:
【中文标题】给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?【英文标题】:Given events with start and end times, how to count the number of simultaneous events using Spark? 【发布时间】:2018-09-26 03:26:35 【问题描述】:给定一个庞大的事件数据集,每个事件的开始和结束时间如下:
+------+--------------------+--------------------+
|id | startTime| endTime|
+------+--------------------+--------------------+
| 1|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 2|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 3|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 4|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 5|2018-01-01 00:00:...|2018-01-01 00:00:...|
| 6|2018-01-01 00:00:...|2018-01-01 00:00:...|
+------+--------------------+--------------------+
如何计算任何给定时间同时发生的事件数?如下:
+--------------------+-----+
| time|count|
+--------------------+-----+
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 3|
|2018-01-01 00:00:...| 2|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
|2018-01-01 00:00:...| 1|
|2018-01-01 00:00:...| 0|
+--------------------+-----+
这是针对 batch
用例,以下是使用 Windows 的尝试(希望有其他使用 Spark
的更优雅/高性能的解决方案):
case class EventWithEnd(source: String, startTime: Timestamp, endTime: Timestamp)
val eventsWithEnd: Dataset[EventWithEnd] = ...
val ws = Window.orderBy("time").rowsBetween(Long.MinValue, 0)
eventsWithEnd
.flatMap(e => List(EventTime(e.startTime, "START"), EventTime(e.endTime, "END")))
.orderBy(asc("time"))
.withColumn("starts", count(when(col("eventType") === "START", true)) over ws)
.withColumn("ends", count(when(col("eventType") === "END", true)) over ws)
.withColumn("count", col("starts") - col("ends"))
.drop("eventType", "starts", "ends")
【问题讨论】:
我不知道足够多的 scala 来为您提供确切的解决方案,但从概念上讲,我会这样做:1. 选择一个分辨率(比如 1 秒),2. 做平面图创建所有时间戳start 和 end 之间是分辨率的倍数,3. 按值计数。 【参考方案1】:此解决方案使用 Dataset API:给定,Es 它计算 ECs
// event where start (s) is inclusive, end (e) is exclusive
case class E(id: String, s: Int, e: Option[Int])
object E
def apply(s: Int, e: Int): E = E(UUID.randomUUID.toString, s, Some(e))
//count of events at t
case class EC(t: Int, count: Int)
//transformation
implicit class EOps(es: Dataset[E])
def counts(implicit spark: SparkSession): Dataset[EC] =
import spark.implicits._
val bs: Dataset[B] = es
.flatMap(e => Seq(B(e.s, "start"), B(e.e.get, "end")))
val ts: Dataset[Int] = bs
.map(b => b.t)
.distinct()
val bbs: Dataset[(Int, B)] = ts
.joinWith(
bs,
ts("value") >= bs("t"),
"left_outer")
bbs
.groupByKey case (l, _) => l
.mapGroups case (k, vs) =>
val count: Int = vs
.map case (_, b) => b.bt
.foldLeft(0) (c, v) =>
v match
case "start" => c + 1
case "end" => c - 1
EC(k, count)
//Example test case
"EOps" should "return counts at time t given events" in
val es: Dataset[E] = Seq(E(1, 3), E(2, 4), E(3, 5))
.toDS
val cs: Dataset[EC] = es
.counts
//cs.explain()
cs.collect() must contain theSameElementsAs
Seq(EC(1, 1), EC(2, 2), EC(3, 2), EC(4, 1), EC(5, 0))
【讨论】:
以上是关于给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?的主要内容,如果未能解决你的问题,请参考以下文章
在给定日期时间连续性的情况下,Pandas 输出日期、开始和结束时间以及事件状态
是否可以匹配字符串中所有出现的模式,具有给定的开始和结束? [复制]
如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?