spark:如何在数据帧上进行 dropDuplicates,同时保持最高时间戳行 [重复]

Posted

技术标签:

【中文标题】spark:如何在数据帧上进行 dropDuplicates,同时保持最高时间戳行 [重复]【英文标题】:spark: How to do a dropDuplicates on a dataframe while keeping the highest timestamped row [duplicate] 【发布时间】:2016-04-14 12:59:17 【问题描述】:

我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的“id”字段),同时保留具有最高“时间戳”(unix 时间戳)字段的行.

我找到了 drop_duplicate 方法(我正在使用 pyspark),但无法控制要保留的项目。

有人可以帮忙吗?提前谢谢

【问题讨论】:

【参考方案1】:

可能需要手动 map 和 reduce 来提供您想要的功能。

def selectRowByTimeStamp(x,y):
    if x.timestamp > y.timestamp:
        return x
    return y

dataMap = data.map(lambda x: (x.id, x))
uniqueData = dataMap.reduceByKey(selectRowByTimeStamp) 

在这里,我们根据 id 对所有数据进行分组。然后,当我们减少分组时,我们通过保留具有最高时间戳的记录来做到这一点。当代码完成reducing时,每个id只剩下1条记录。

【讨论】:

dataMap的目的是什么? 其实应该是data.map(lambda x: (x.id, x))(或者keyBy)。让我们修复它。 绝对正确,不错的收获 谢谢大卫,我最终使用了你提出的解决方案,尽管大卫格里芬的解决方案同样有效。【参考方案2】:

你可以这样做:

val df = Seq(
  (1,12345678,"this is a test"),
  (1,23456789, "another test"),
  (2,2345678,"2nd test"),
  (2,1234567, "2nd another test")
).toDF("id","timestamp","data")

+---+---------+----------------+
| id|timestamp|            data|
+---+---------+----------------+
|  1| 12345678|  this is a test|
|  1| 23456789|    another test|
|  2|  2345678|        2nd test|
|  2|  1234567|2nd another test|
+---+---------+----------------+

df.join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show
+---+---------+------------+
| id|timestamp|        data|
+---+---------+------------+
|  1| 23456789|another test|
|  2|  2345678|    2nd test|
+---+---------+------------+

如果您预计 id 可能会出现重复的 timestamp(请参阅下面的 cmets),您可以这样做:

df.dropDuplicates(Seq("id", "timestamp")).join(
  df.groupBy($"id").agg(max($"timestamp") as "r_timestamp").withColumnRenamed("id", "r_id"),
  $"id" === $"r_id" && $"timestamp" === $"r_timestamp"
).drop("r_id").drop("r_timestamp").show

【讨论】:

接近但不保证每个 id 有一行。 嗯,你的意思是如果时间戳是一样的? 是的,完全正确。我猜应该可以在这里任意取一个。 如果您知道自己的数据,这可能不是问题之一。就像你说的,你总是可以在做剩下的事情之前做:df.groupBy($"id", $"timestamp").agg(last($"data")) 是的。 drop_duplicate 可能比 last 更通用。您可以在不混合值的情况下处理完整的行。

以上是关于spark:如何在数据帧上进行 dropDuplicates,同时保持最高时间戳行 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何从 Spark 数据帧行解析和转换 json 字符串

如何将spark数据帧列名和行数据转换为json数据

Spark:如何通过 mapInPandas 正确转换数据帧

Spark多个动态聚合函数,countDistinct不起作用

按行索引拆分 Spark 数据帧

使用行分隔符拆分 Spark 数据帧