按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除

Posted

技术标签:

【中文标题】按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除【英文标题】:Dedupe rows in Spark DataFrame by most recent timestamp 【发布时间】:2018-09-20 17:45:51 【问题描述】:

我有一个DataFrame,其架构如下:

root
|- documentId
|- timestamp
|- anotherField

例如,

"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"

请注意,为了便于理解(以及我的方便),我将时间戳显示为字符串。它实际上是一个long,表示自纪元以来的毫秒数。

如此处所示,有重复的行(第 1 行和第 3 行)具有相同的 documentId 但不同的 timestamp(以及可能不同的其他字段)。我想对每个 documentId 进行重复数据删除并仅保留最近的(基于 timestamp)行。

一个简单的df.groupBy("documentId").agg(max("timestamp), ...) 似乎不太可能在这里工作,因为我不知道如何保留对应于满足max("timestamp") 的行中的其他字段。

所以,我想出了一个复杂的方法。

// first find the max timestamp corresponding to each documentId
val mostRecent = df
    .select("documentId", "timestamp")
      .groupBy("documentId")
        .agg(max("timestamp"))

// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")

生成的dedupedDf 应该只包含与每个documentId 的最新条目相对应的那些行。

虽然这可行,但我认为这不是正确(或有效)的方法,因为我使用的是 join,这似乎没有必要。

我怎样才能做得更好?我正在寻找纯粹的基于“DataFrame”的解决方案,而不是基于 RDD 的方法(因为 DataBricks 人员在研讨会上反复告诉我们使用 DataFrames 而不是 RDDs)。

【问题讨论】:

您没有使用正常意义上的“重复(行)”。请使用足够的单词和句子来清楚地表达您的意思。这里显然有多行具有相同的子行值。 【参考方案1】:

查看下面的代码可以帮助您实现目标,

val df = Seq(
  ("d1", "2018-09-20 10:00:00", "blah1"),
  ("d2", "2018-09-20 09:00:00", "blah2"),
  ("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
     .where($"rownum" === 1).drop("rownum")

Resultdf.show()

输入:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d1|2018-09-20 10:00:00|       blah1|
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

输出:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

【讨论】:

这是实现结果的最有效方式吗?

以上是关于按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除的主要内容,如果未能解决你的问题,请参考以下文章

Flutter 按时间戳对 Firebase 快照进行排序

如何按时间戳对数组进行排序?

在 Cloudkit.JS 中按时间戳对记录进行排序

按时间戳对组内的行进行排序

按日期排序 Spark DataFrame 列的数组

Spark:保存按“虚拟”列分区的 DataFrame