每个键触发聚合事件集,包括它们的更改时间戳

Posted

技术标签:

【中文标题】每个键触发聚合事件集,包括它们的更改时间戳【英文标题】:spark aggregate set of events per key including their change timestamps 【发布时间】:2020-03-25 12:28:19 【问题描述】:

对于以下数据框:

+----+--------+-------------------+----+
|user|      dt|         time_value|item|
+----+--------+-------------------+----+
| id1|20200101|2020-01-01 00:00:00|   A|
| id1|20200101|2020-01-01 10:00:00|   B|
| id1|20200101|2020-01-01 09:00:00|   A|
| id1|20200101|2020-01-01 11:00:00|   B|
+----+--------+-------------------+----+

我想捕获所有独特的项目,即collect_set,但保留自己的time_value

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.types.TimestampType
val timeFormat = "yyyy-MM-dd HH:mm"
val dx = Seq(("id1", "20200101", "2020-01-01 00:00", "A"), ("id1", "20200101","2020-01-01 10:00", "B"), ("id1", "20200101","2020-01-01 9:00", "A"), ("id1", "20200101","2020-01-01 11:00", "B")).toDF("user", "dt","time_value", "item").withColumn("time_value", unix_timestamp(col("time_value"), timeFormat).cast(TimestampType))
dx.show

一个

dx.groupBy("user", "dt").agg(collect_set("item")).show
+----+--------+-----------------+                                               
|user|      dt|collect_set(item)|
+----+--------+-----------------+
| id1|20200101|           [B, A]|
+----+--------+-----------------+

信号从A切换到B时不保留time_value信息。如何保留item中每组的时间值信息?

是否可以在窗口函数中使用 collect_set 以达到预期的效果?目前,我只能想到:

    使用窗口函数来确定事件对 过滤以更改事件 聚合

需要多次洗牌。或者,也可以使用 UDF (collect_list(sort_array(struct(time_value, item)))),但这似乎也很笨拙。

有没有更好的办法?

【问题讨论】:

您的预期结果是什么? 在您当前的聚合中,A 和 B 各有两个不同的“time_value”候选者,应该选择哪个?正如@Lamanus 指出的那样,很难推断出你的最终目标是什么。 【参考方案1】:

我确实会使用窗口函数来隔离更改点,我认为没有其他选择:

val win = Window.partitionBy($"user",$"dt").orderBy($"time_value")

dx
.orderBy($"time_value")
.withColumn("item_change_post",coalesce((lag($"item",1).over(win)=!=$"item"),lit(false)))
.withColumn("item_change_pre",lead($"item_change_post",1).over(win))
.where($"item_change_pre" or $"item_change_post")
.show()

+----+--------+-------------------+----+----------------+---------------+
|user|      dt|         time_value|item|item_change_post|item_change_pre|
+----+--------+-------------------+----+----------------+---------------+
| id1|20200101|2020-01-01 09:00:00|   A|           false|           true|
| id1|20200101|2020-01-01 10:00:00|   B|            true|          false|
+----+--------+-------------------+----+----------------+---------------+

然后使用类似groupBy($"user",$"dt").agg(collect_list(struct($"time_value",$"item")))

我认为不会发生多次随机播放,因为您总是按相同的键进行分区/分组。

您可以尝试通过将初始数据帧聚合到每个 item 的最小/最大 time_value 来提高效率,然后执行与上述相同的操作。

【讨论】:

但是当我想以一组边界结束时 - 那么第二个聚合是必要的。

以上是关于每个键触发聚合事件集,包括它们的更改时间戳的主要内容,如果未能解决你的问题,请参考以下文章

如何为每个键值选择具有最新时间戳的行?

在视频播放期间以特定时间戳触发事件

通过 BigQuery 上的更改事件聚合时间序列

多个 Maven 项目针对一个公共时间戳发布

在 python-influxDB-Grafana 堆栈中存储/显示每个时间戳的完整数据集

使用聚合管道聚合 MongoDB 中的时间戳集合