PySpark - 获取重复行的索引
Posted
技术标签:
【中文标题】PySpark - 获取重复行的索引【英文标题】:PySpark - Get indices of duplicate rows 【发布时间】:2018-06-14 20:55:33 【问题描述】:假设我有一个 PySpark 数据框,如下所示:
+--+--+--+--+
|a |b |c |d |
+--+--+--+--+
|1 |0 |1 |2 |
|0 |2 |0 |1 |
|1 |0 |1 |2 |
|0 |4 |3 |1 |
+--+--+--+--+
如何创建一个标记所有重复行的列,如下所示:
+--+--+--+--+--+
|a |b |c |d |e |
+--+--+--+--+--+
|1 |0 |1 |2 |1 |
|0 |2 |0 |1 |0 |
|1 |0 |1 |2 |1 |
|0 |4 |3 |1 |0 |
+--+--+--+--+--+
我尝试使用 groupBy 和聚合函数无济于事。
【问题讨论】:
不是完全的欺骗,但this answer 是一种方法。试试看:df.groupBy(df.columns).count().show()
【参考方案1】:
只是为了扩展my comment:
您可以按所有列分组并使用pyspark.sql.functions.count()
确定列是否重复:
import pyspark.sql.functions as f
df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")).show()
#+---+---+---+---+---+
#| a| b| c| d| e|
#+---+---+---+---+---+
#| 1| 0| 1| 2| 1|
#| 0| 2| 0| 1| 0|
#| 0| 4| 3| 1| 0|
#+---+---+---+---+---+
这里我们使用count("*") > 1
作为聚合函数,并将结果转换为int
。 groupBy()
将导致删除重复行。根据您的需要,这可能就足够了。
但是,如果您想保留所有行,可以使用 Window
函数,如其他答案所示,或者您可以使用 join()
:
df.join(
df.groupBy(df.columns).agg((f.count("*")>1).cast("int").alias("e")),
on=df.columns,
how="inner"
).show()
#+---+---+---+---+---+
#| a| b| c| d| e|
#+---+---+---+---+---+
#| 1| 0| 1| 2| 1|
#| 1| 0| 1| 2| 1|
#| 0| 2| 0| 1| 0|
#| 0| 4| 3| 1| 0|
#+---+---+---+---+---+
在这里,我们将原始数据帧与上面所有列上groupBy()
的结果进行内部连接。
【讨论】:
【参考方案2】:定义一个window
函数来检查所有列分组时行的count
是否大于1。如果是,则重复(1)否则不重复(0)
allColumns = df.columns
import sys
from pyspark.sql import functions as f
from pyspark.sql import window as w
windowSpec = w.Window.partitionBy(allColumns).rowsBetween(-sys.maxint, sys.maxint)
df.withColumn('e', f.when(f.count(f.col('d')).over(windowSpec) > 1, f.lit(1)).otherwise(f.lit(0))).show(truncate=False)
这应该给你
+---+---+---+---+---+
|a |b |c |d |e |
+---+---+---+---+---+
|1 |0 |1 |2 |1 |
|1 |0 |1 |2 |1 |
|0 |2 |0 |1 |0 |
|0 |4 |3 |1 |0 |
+---+---+---+---+---+
希望回答对你有帮助
更新
作为@pault commented,您可以通过将boolean
转换为integer
来消除when
、col
和lit
:
df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False)
【讨论】:
这里不需要when
、col
或lit
- 您可以将条件转换为整数:df.withColumn('e', (f.count('*').over(windowSpec) > 1).cast('int')).show(truncate=False)
感谢@pault 的评论 :) 真的很有帮助,我的回答中包含了您的评论【参考方案3】:
我认为 pandas_udf 可以更轻松地处理这个问题。首先,您需要创建一个 pandas UDF,它接受一个 Series 并为重复的行返回 True。然后,只需使用 withColumn 标记重复的行。这是我建议的代码:
@pandas_udf('boolean')
def duplicate_finder(s: pd.Series) -> pd.Series:
return s.duplicated(keep=False)
df.withColumn('Duplicated', duplicate_finder('DESIRED_COLUMN')).show()
【讨论】:
【参考方案4】:使用所有列对数据框进行分区,然后应用dense_rank。
import sys
from pyspark.sql.functions import dense_rank
from pyspark.sql import window as w
df.withColumn('e', dense_rank().over(w.Window.partitionBy(df.columns))).show()
【讨论】:
AnalysisException: u'Window function dense_rank() requires window to be ordered
【参考方案5】:
df1=df_interr.groupBy("Item_group","Item_name","price").count().filter("count > 1")
【讨论】:
以上是关于PySpark - 获取重复行的索引的主要内容,如果未能解决你的问题,请参考以下文章