在 groupBy scala spark 之后保留最近的行
Posted
技术标签:
【中文标题】在 groupBy scala spark 之后保留最近的行【英文标题】:Keep most recent row after groupBy scala spark 【发布时间】:2019-10-29 17:27:48 【问题描述】:我有一个这样的数据框:
+--------+----------+-------------------+------+---------------+
| ID| DATE| TIME| COL1| COL2|
+--------+----------+-------------------+------+---------------+
|21763789|2019-08-29|2019-08-29 17:08:06| 1 | USA|
|29211238|2019-08-27|2019-08-27 11:04:42| 1 | SPAIN|
| 1696884|2019-08-10|2019-08-10 21:07:57| 1 | ITALIA|
| 211801|2019-08-06|2019-08-06 20:42:25| 1 | SPAIN|
|20183201|2019-08-07|2019-08-07 16:59:09| 5001| SPAIN|
|21763789|2019-08-27|2019-08-27 10:14:38| 1 | USA|
|29211238|2019-08-14|2019-08-14 09:39:09| 1 | ITALIA|
|20183201|2019-08-19|2019-08-19 21:30:29| 5001| USA|
|29211238|2019-08-23|2019-08-23 19:00:25| 1 | USA|
| 211801|2019-08-22|2019-08-22 05:22:28| 1 | USA|
| 211801|2019-08-28|2019-08-28 11:58:33| 1 | ITALIA|
|25648097|2019-08-30|2019-08-30 15:10:22| 2 | SPAIN|
|29211238|2019-08-27|2019-08-27 11:04:44| 1 | SPAIN|
|26295227|2019-08-25|2019-08-25 00:08:22| 1 | USA|
|21763789|2019-08-20|2019-08-20 13:04:34| 1 | SPAIN|
| 1696884|2019-08-23|2019-08-23 09:27:50| 1 | ITALIA|
| 6209818|2019-08-03|2019-08-03 14:52:25| 1 | ITALIA|
|26295227|2019-08-21|2019-08-21 12:46:58| 1 | USA|
|29211238|2019-08-22|2019-08-22 17:46:42| 1 | USA|
|21763789|2019-08-07|2019-08-07 13:02:18| 1 | SPAIN|
+--------+----------+-------------------+------+---------------+
我想按 ID 和 DATE 对这个数据框进行分组,然后我想只保留 TIME 列中的最新行:
df.groupBy(col("ID"), col("DATE")).agg(min(col("TIME"))) 也许它有效,但我还有很多其他列,那么我的聚合可能会破坏它们?
val onlyRecent = Window.partitionBy(col("ID"), col("DATE")).orderBy(col("TIME")) 我不知道这是否有用。
请问您有什么想法吗? 谢谢
【问题讨论】:
【参考方案1】:你在正确的轨道上使用窗口函数。基本上,您希望以某种方式“标记”要保留的记录,然后按该标签进行过滤。您使用哪个标签函数完全取决于您要对TIME
列中的重复项执行什么操作。以下将选择“绑定”记录之一(有效地随机)。
val w = Window.partitionBy($"ID", $"DATE").orderBy($"TIME".desc)
df.withColumn("rank", row_number().over(w)).where($"rank" === 1).drop("rank")
如果您想在出现“平局”时保留两条记录,请使用rank()
或dense_rank()
代替row_number()
。
【讨论】:
您好 Travis,非常感谢您的回答。看起来很完美。我明天试试。谢谢以上是关于在 groupBy scala spark 之后保留最近的行的主要内容,如果未能解决你的问题,请参考以下文章
Spark(scala):groupby和聚合值列表到一个基于索引的列表[重复]
Spark/Scala 1.6 如何使用 dataframe groupby agg 来实现以下逻辑?