PySpark - 获取重复行的索引

Posted

技术标签:

【中文标题】PySpark - 获取重复行的索引【英文标题】:PySpark - Get indices of duplicate rows 【发布时间】:2018-06-14 20:55:33 【问题描述】:

假设我有一个 PySpark 数据框,如下所示:

+--+--+--+--+
|a |b |c |d |
+--+--+--+--+
|1 |0 |1 |2 |
|0 |2 |0 |1 |
|1 |0 |1 |2 |
|0 |4 |3 |1 |
+--+--+--+--+

如何创建一个标记所有重复行的列,如下所示:

+--+--+--+--+--+
|a |b |c |d |e |
+--+--+--+--+--+
|1 |0 |1 |2 |1 |
|0 |2 |0 |1 |0 |
|1 |0 |1 |2 |1 |
|0 |4 |3 |1 |0 |
+--+--+--+--+--+

我尝试使用 groupBy 和聚合函数无济于事。

【问题讨论】:

不是完全的欺骗,但this answer 是一种方法。试试看:df.groupBy(df.columns).count().show() 【参考方案1】:

只是为了扩展my comment:

您可以按所有列分组并使用pyspark.sql.functions.count() 确定列是否重复:

import pyspark.sql.functions as f
df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")).show()
#+---+---+---+---+---+
#|  a|  b|  c|  d|  e|
#+---+---+---+---+---+
#|  1|  0|  1|  2|  1|
#|  0|  2|  0|  1|  0|
#|  0|  4|  3|  1|  0|
#+---+---+---+---+---+

这里我们使用count("*") > 1 作为聚合函数,并将结果转换为intgroupBy() 将导致删除重复行。根据您的需要,这可能就足够了。

但是,如果您想保留所有行,可以使用 Window 函数,如其他答案所示,或者您可以使用 join()

df.join(
    df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")),
    on=df.columns,
    how="inner"
).show()
#+---+---+---+---+---+
#|  a|  b|  c|  d|  e|
#+---+---+---+---+---+
#|  1|  0|  1|  2|  1|
#|  1|  0|  1|  2|  1|
#|  0|  2|  0|  1|  0|
#|  0|  4|  3|  1|  0|
#+---+---+---+---+---+

在这里,我们将原始数据帧与上面所有列上groupBy() 的结果进行内部连接。

【讨论】:

【参考方案2】:

定义一个window函数来检查所有列分组时行的count是否大于1。如果是,则重复(1)否则不重复(0)

allColumns = df.columns
import sys
from pyspark.sql import functions as f
from pyspark.sql import window as w
windowSpec = w.Window.partitionBy(allColumns).rowsBetween(-sys.maxint, sys.maxint)

df.withColumn('e', f.when(f.count(f.col('d')).over(windowSpec) > 1, f.lit(1)).otherwise(f.lit(0))).show(truncate=False) 

这应该给你

+---+---+---+---+---+
|a  |b  |c  |d  |e  |
+---+---+---+---+---+
|1  |0  |1  |2  |1  |
|1  |0  |1  |2  |1  |
|0  |2  |0  |1  |0  |
|0  |4  |3  |1  |0  |
+---+---+---+---+---+

希望回答对你有帮助

更新

作为@pault commented,您可以通过将boolean 转换为integer 来消除whencollit

df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False)

【讨论】:

这里不需要whencollit - 您可以将条件转换为整数:df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False) 感谢@pault 的评论 :) 真的很有帮助,我的回答中包含了您的评论【参考方案3】:

我认为 pandas_udf 可以更轻松地处理这个问题。首先,您需要创建一个 pandas UDF,它接受一个 Series 并为重复的行返回 True。然后,只需使用 withColumn 标记重复的行。这是我建议的代码:

@pandas_udf('boolean')
def duplicate_finder(s: pd.Series) -> pd.Series:
    return s.duplicated(keep=False)

df.withColumn('Duplicated', duplicate_finder('DESIRED_COLUMN')).show()

【讨论】:

【参考方案4】:

使用所有列对数据框进行分区,然后应用dense_rank。

import sys
from pyspark.sql.functions import dense_rank
from pyspark.sql import window as w

df.withColumn('e', dense_rank().over(w.Window.partitionBy(df.columns))).show()

【讨论】:

AnalysisException: u'Window function dense_rank() requires window to be ordered【参考方案5】:

df1=df_interr.groupBy("Item_group","Item_name","price").count().filter("count > 1")

【讨论】:

以上是关于PySpark - 获取重复行的索引的主要内容,如果未能解决你的问题,请参考以下文章

查找重复行的索引 [重复]

PySpark - 从 UDF 获取行索引

从 TypeScript 中的行 ID 获取表行索引 [重复]

Pyspark SQL拆分数据框行的记录[重复]

获取插入行的ID [重复]

从 Hive 表中获取与重复行的差异