删除每个分区的重复项
Posted
技术标签:
【中文标题】删除每个分区的重复项【英文标题】:Drop duplicates for each partition 【发布时间】:2017-02-17 05:41:39 【问题描述】:原始数据
cls, id
----
a, 1
a, 1
----
b, 3
b, 3
b, 4
预期输出
cls, id
----
a, 1
----
b, 3
b, 4
id只能在同一个cls中重复,表示同一个id不存在于clses中。
在那种情况下。
df.dropDuplicates($id)
将在所有分区中随机播放以检查 cls 上的重复项。并重新分区为 200(默认值)
现在,如何为每个分区单独运行 dropDuplicates 以降低计算成本?
类似
df.foreachPartition(_.dropDuplicates())
【问题讨论】:
你能把这个输入的例外输出放在一边吗 请查看我在问题中添加的内容。预期数据已更新。 如果你只有这2个值那么你可以试试distinct()
方法
is distinct() 不只是 dropDuplicates() 的别名? spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/…
啊,其实还有10个字段,但是id字段是整行的hash,所以可以用于dropDuplicates($id)。但是在 dropDuplicates() 之后还需要其他 10 个字段。
【参考方案1】:
你可能会追求这样的东西:
val distinct = df.mapPartitions(it =>
val set = Set();
while (it.hasNext)
set += it.next()
return set.iterator
);
【讨论】:
【参考方案2】:Not not not a with set。事实上,如果数据量很大,Set 就太危险了。 您可以想到的一种选择是添加 mapPartitionsWithIndex 并将索引添加为输出迭代器。这样在你的 DF 中,分区索引就存在了。稍后,通过传递分区号和另一个键来应用删除重复项。理想情况下,对于键和映射分区的组合,重复记录会被删除。
【讨论】:
以上是关于删除每个分区的重复项的主要内容,如果未能解决你的问题,请参考以下文章