在 groupBy scala spark 之后保留最近的行

Posted

技术标签:

【中文标题】在 groupBy scala spark 之后保留最近的行【英文标题】:Keep most recent row after groupBy scala spark 【发布时间】:2019-10-29 17:27:48 【问题描述】:

我有一个这样的数据框:

+--------+----------+-------------------+------+---------------+
|      ID|      DATE|               TIME|  COL1|           COL2|
+--------+----------+-------------------+------+---------------+
|21763789|2019-08-29|2019-08-29 17:08:06|  1   |            USA|
|29211238|2019-08-27|2019-08-27 11:04:42|  1   |          SPAIN|
| 1696884|2019-08-10|2019-08-10 21:07:57|  1   |         ITALIA|
|  211801|2019-08-06|2019-08-06 20:42:25|  1   |          SPAIN|
|20183201|2019-08-07|2019-08-07 16:59:09|  5001|          SPAIN|
|21763789|2019-08-27|2019-08-27 10:14:38|  1   |            USA|
|29211238|2019-08-14|2019-08-14 09:39:09|  1   |         ITALIA|
|20183201|2019-08-19|2019-08-19 21:30:29|  5001|            USA|
|29211238|2019-08-23|2019-08-23 19:00:25|  1   |            USA|
|  211801|2019-08-22|2019-08-22 05:22:28|  1   |            USA|
|  211801|2019-08-28|2019-08-28 11:58:33|  1   |         ITALIA|
|25648097|2019-08-30|2019-08-30 15:10:22|  2   |          SPAIN|
|29211238|2019-08-27|2019-08-27 11:04:44|  1   |          SPAIN|
|26295227|2019-08-25|2019-08-25 00:08:22|  1   |            USA|
|21763789|2019-08-20|2019-08-20 13:04:34|  1   |          SPAIN|
| 1696884|2019-08-23|2019-08-23 09:27:50|  1   |         ITALIA| 
| 6209818|2019-08-03|2019-08-03 14:52:25|  1   |         ITALIA|
|26295227|2019-08-21|2019-08-21 12:46:58|  1   |            USA|
|29211238|2019-08-22|2019-08-22 17:46:42|  1   |            USA|
|21763789|2019-08-07|2019-08-07 13:02:18|  1   |          SPAIN|
+--------+----------+-------------------+------+---------------+

我想按 ID 和 DATE 对这个数据框进行分组,然后我想只保留 TIME 列中的最新行:

df.groupBy(col("ID"), col("DATE")).agg(min(col("TIME"))) 也许它有效,但我还有很多其他列,那么我的聚合可能会破坏它们?

val onlyRecent = Window.partitionBy(col("ID"), col("DATE")).orderBy(col("TIME")) 我不知道这是否有用。

请问您有什么想法吗? 谢谢

【问题讨论】:

【参考方案1】:

你在正确的轨道上使用窗口函数。基本上,您希望以某种方式“标记”要保留的记录,然后按该标签进行过滤。您使用哪个标签函数完全取决于您要对TIME 列中的重复项执行什么操作。以下将选择“绑定”记录之一(有效地随机)。

val w = Window.partitionBy($"ID", $"DATE").orderBy($"TIME".desc)

df.withColumn("rank", row_number().over(w)).where($"rank" === 1).drop("rank")

如果您想在出现“平局”时保留两条记录,请使用rank()dense_rank() 代替row_number()

【讨论】:

您好 Travis,非常感谢您的回答。看起来很完美。我明天试试。谢谢

以上是关于在 groupBy scala spark 之后保留最近的行的主要内容,如果未能解决你的问题,请参考以下文章

Spark(scala):groupby和聚合值列表到一个基于索引的列表[重复]

Scala Spark groupBy/Agg 函数

Spark/Scala 1.6 如何使用 dataframe groupby agg 来实现以下逻辑?

Spark Scala数据框具有单个Group By的多个聚合[重复]

Spark数据框:如何在groupBy + sum之后使用

Scala - 对 RDD 上的 Groupby 和 Max