如何根据自定义逻辑在 spark 数据框中删除重复行?

Posted

技术标签:

【中文标题】如何根据自定义逻辑在 spark 数据框中删除重复行?【英文标题】:How to de duplicate rows in spark dataframe based on custom logic? 【发布时间】:2021-01-04 23:16:06 【问题描述】:

我有一个如下所示的 spark 数据框:

Id,timestamp,index,target
id1,2020-04-03,1,34
id1,2020-04-03,2,37
id1,2020-04-04,1,31
id1,2020-04-05,1,29
id2,2020-04-03,1,35
...

数据框在“Id”列上的集群中进行分区。

我想确保没有“Id”和“timestamp”值重复的行。 如果有重复条目,那么我想选择“索引”值较低的行。 (如果在“Id”、“timestamp”、“index”中存在具有相同条目的重复行;那么选择任何行都可以)

所以上面的去重后的dataframe应该是这样的:

Id,timestamp,index,target
id1,2020-04-03,1,34
id1,2020-04-04,1,31
id1,2020-04-05,1,29
id2,2020-04-03,1,35
...

请注意,第二行 已被删除。

由于数据帧已经在“Id”上进行了分区 - 我希望找到一种不需要跨分区通信的方法,从而使操作非常高效。

【问题讨论】:

【参考方案1】:
val df = Seq(
  ("id1", "2020-04-03", "2", "34"),
  ("id1", "2020-04-03", "3", "34"),
  ("id1", "2020-04-03", "1", "37"),
  ("id1", "2020-04-03", "5", "34"),
  ("id1", "2020-04-04", "1", "31"),
  ("id1", "2020-04-05", "1", "29"),
  ("id2", "2020-04-03", "1", "35")).toDF("Id", "timestamp", "index", "target")

df.sort("index").dropDuplicates("Id", "timestamp").orderBy("timestamp").show()
+---+----------+-----+------+
| Id| timestamp|index|target|
+---+----------+-----+------+
|id1|2020-04-03|    1|    37|
|id2|2020-04-03|    1|    35|
|id1|2020-04-04|    1|    31|
|id1|2020-04-05|    1|    29|
+---+----------+-----+------+

您可以按索引排序,然后使用删除重复来实现并保持较低的索引。

【讨论】:

我想这行得通,但正如问题所问的那样,它没有使用 DF 已经由 id 预先分区的事实,这可以用来避免你的sort 的洗牌做。我看到它的方式(但我没有给予足够的考虑),将您的排序更改为id, timestamp, index 按此顺序会更有效。 DF按id分区,这不应该打乱,然后dropDuplicates自然会保持最低索引。

以上是关于如何根据自定义逻辑在 spark 数据框中删除重复行?的主要内容,如果未能解决你的问题,请参考以下文章

根据条件从 spark 数据框中删除行

Spark - 如何使用列对数据框中的字符串进行切片[重复]

Scala(Spark)连接数据框中的列[重复]

如何根据条件表达式从熊猫数据框中删除行[重复]

如何根据条件表达式从熊猫数据框中删除行[重复]

如何使用 JAVA 在 Spark SQL 中基于单列删除重复行