Apache Spark 如何检测重复项?可以修改吗?
Posted
技术标签:
【中文标题】Apache Spark 如何检测重复项?可以修改吗?【英文标题】:How does Apache Spark detect duplicates? Can it be modified? 【发布时间】:2017-08-24 14:40:53 【问题描述】:Apache Spark 如何检测重复行?
我问的原因是我想有一些不同的行为:
在用于重复检测的列集中,对于其中一些(类型为double
),我希望重复检测基于两个值之间的差异低于某个阈值(由我)。
我想这可以使用crossJoin()
和适当的where
语句之后,但是,我希望有一个更优雅的解决方案?
谢谢!
【问题讨论】:
【参考方案1】:它使用HashArggregate
:
scala> df.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[x#12], functions=[])
+- Exchange hashpartitioning(x#12, 200)
+- *HashAggregate(keys=[x#12], functions=[])
+- LocalTableScan [x#12]
我希望有一个更优雅的解决方案?
您可以尝试 LSH 运算符提供的近似连接:
Bucketed Random Projection for Euclidean Distance。 MinHash for Jaccard Distance但它不太可能使用单一功能。
您可以对窗口函数使用类似会话的方法,但这仅在您可以将数据划分为分区时才有用。如果您对近似值满意,您可以使用固定大小范围,然后应用我在Spark - Window with recursion? - Conditionally propagating values across rows中描述的方法@
sort
后跟mapPartitions
可以实现另一个近似值。
df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF()
dropDuplicates
的实现方式类似于:
def drop_duplicates(xs):
prev = None
for x in xs:
if prev is None or abs(x - prev) > threshold:
yield x
prev = x
通过一些努力,您也可以使其在分区边界上保持一致。
【讨论】:
感谢您提供有趣而有趣的指点。问题是:我正在尝试获得 exact 解决方案。对于近似解决方案,我可以根据阈值进行乘法/除法/舍入,然后完成。顺便说一句:我确实有一个groupID
列,可用于Window.partitionBy('groupID')
。以上是关于Apache Spark 如何检测重复项?可以修改吗?的主要内容,如果未能解决你的问题,请参考以下文章
删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?