使用 spark 和 scala 根据总时间查找每个 ID 的前 N ​​个游戏

Posted

技术标签:

【中文标题】使用 spark 和 scala 根据总时间查找每个 ID 的前 N ​​个游戏【英文标题】:Find top N game for every ID based on total time using spark and scala 【发布时间】:2020-09-13 13:07:01 【问题描述】:

根据总时间为每个观看的id 查找前 N 个游戏,所以这是我的输入数据框:

输入DF

id    |    Game  |  Time
 1           A       10
 2           B       100
 1           A       100
 2           C       105
 1           N       103
 2           B       102
 1           N       90
 2           C       110

这是我期待的输出:

输出DF:

id    |  Game   |  Time|
 1          N       193
 1          A       110
 2          C       215
 2          B       202

这是我尝试过的,但没有按预期工作:

val windowDF = Window.partitionBy($"id").orderBy($"Time".desc)
   
   InputDF.withColumn("rank", row_number().over(windowDF))
      .filter("rank<=10")

【问题讨论】:

OutputDF 看起来像 groupBy('id, 'Game).sum('Time) 的结果。当您说“根据总时间查找 topN ...”时,您到底是什么意思?是否应该只总结每组的前 N ​​个? 【参考方案1】:

您的前 N ​​名排名仅适用于个人time,而不适用于每场比赛的total time。一个groupBy/sum 来计算total time,然后在total time 上进行排名就可以了:

val df = Seq(
  (1, "A", 10),
  (2, "B", 100),
  (1, "A", 100),
  (2, "C", 105),
  (1, "N", 103),
  (2, "B", 102),
  (1, "N", 90),
  (2, "C", 110)
).toDF("id", "game", "time")

import org.apache.spark.sql.expressions.Window
val win = Window.partitionBy($"id").orderBy($"total_time".desc)

df.
  groupBy("id", "game").agg(sum("time").as("total_time")).
  withColumn("rank", row_number.over(win)).
  where($"rank" <= 10).
  show
// +---+----+----------+----+
// | id|game|total_time|rank|
// +---+----+----------+----+
// |  1|   N|       193|   1|
// |  1|   A|       110|   2|
// |  2|   C|       215|   1|
// |  2|   B|       202|   2|
// +---+----+----------+----+

【讨论】:

感谢您的帮助。我也用类似的查询解决了它。

以上是关于使用 spark 和 scala 根据总时间查找每个 ID 的前 N ​​个游戏的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark/Scala 有效地按键分组并查找在特定时间窗口中发生的事件的上一个时间戳

使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

如何使用scala对spark中rdd的每一行进行排序?

如何在 Spark/Scala 中查找具有许多空值的列

在 Spark 中处理日期

Spark 3.0 排序并应用于组 Scala/Java