Apache Spark 如何检测重复项?可以修改吗?

Posted

技术标签:

【中文标题】Apache Spark 如何检测重复项?可以修改吗?【英文标题】:How does Apache Spark detect duplicates? Can it be modified? 【发布时间】:2017-08-24 14:40:53 【问题描述】:

Apache Spark 如何检测重复行?

我问的原因是我想有一些不同的行为:

在用于重复检测的列集中,对于其中一些(类型为double),我希望重复检测基于两个值之间的差异低于某个阈值(由我)。

我想这可以使用crossJoin() 和适当的where 语句之后,但是,我希望有一个更优雅的解决方案?

谢谢!

【问题讨论】:

【参考方案1】:

它使用HashArggregate:

scala> df.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[x#12], functions=[])
+- Exchange hashpartitioning(x#12, 200)
   +- *HashAggregate(keys=[x#12], functions=[])
      +- LocalTableScan [x#12]

我希望有一个更优雅的解决方案?

您可以尝试 LSH 运算符提供的近似连接:

Bucketed Random Projection for Euclidean Distance。 MinHash for Jaccard Distance

但它不太可能使用单一功能。

您可以对窗口函数使用类似会话的方法,但这仅在您可以将数据划分为分区时才有用。如果您对近似值满意,您可以使用固定大小范围,然后应用我在Spark - Window with recursion? - Conditionally propagating values across rows中描述的方法@

sort 后跟mapPartitions 可以实现另一个近似值。

df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF()

dropDuplicates 的实现方式类似于:

def drop_duplicates(xs):
    prev = None
    for x in xs:
        if prev is None or abs(x - prev) > threshold:
            yield x
        prev = x   

通过一些努力,您也可以使其在分区边界上保持一致。

【讨论】:

感谢您提供有趣而有趣的指点。问题是:我正在尝试获得 exact 解决方案。对于近似解决方案,我可以根据阈值进行乘法/除法/舍入,然后完成。顺便说一句:我确实有一个groupID 列,可用于Window.partitionBy('groupID')

以上是关于Apache Spark 如何检测重复项?可以修改吗?的主要内容,如果未能解决你的问题,请参考以下文章

删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?

PySpark DataFrame 无法删除重复项

如何使用scala在Apache spark中用空字符串(“”)替换空值[重复]

在 Spark Streaming 中,如何检测空批次?

Apache Spark - 多个 RDD 的交集

如何在 Spark Scala 中的 Schema RDD [从案例类中创建] 中查找重复项以及相应的重复计数?