Pyspark - 从数据框中删除重复项,保持最后一次出现

Posted

技术标签:

【中文标题】Pyspark - 从数据框中删除重复项,保持最后一次出现【英文标题】:Pyspark - remove duplicates from dataframe keeping the last appearance 【发布时间】:2018-11-13 16:00:16 【问题描述】:

我正在尝试对 spark 数据框进行重复数据删除,只保留最新的外观。 重复存在于三个变量中:

NAME
ID
DOB

我在 Pandas 中取得了成功:

df_dedupe = df.drop_duplicates(subset=['NAME','ID','DOB'], keep='last', inplace=False)

但在 spark 中我尝试了以下方法:

df_dedupe = df.dropDuplicates(['NAME', 'ID', 'DOB'], keep='last')

我收到此错误:

TypeError: dropDuplicates() got an unexpected keyword argument 'keep'

有什么想法吗?

【问题讨论】:

quynhcodes.wordpress.com/2016/07/29/… 基于[文档][1] dropDuplicatespyspark 中没有对keep 【参考方案1】:

感谢您的帮助。 我遵循了您的指示,但结果并不如预期:

d1 = [('Bob', '10', '1542189668', '0', '0'),  ('Alice', '10', '1425298030', '154', '39'), ('Bob', '10', '1542189668', '178', '42')]
df1 = spark.createDataFrame(d1, ['NAME', 'ID', 'DOB' , 'Height' , 'ShoeSize'])
df_dedupe = df1.dropDuplicates(['NAME', 'ID', 'DOB'])
df_reverse = df1.sort((["NAME", "ID", "DOB"]), ascending= False)
df_dedupe.join(df_reverse,['NAME','ID','DOB'],'inner')
df_dedupe.show(100, False)

结果是:

+-----+---+----------+------+--------+    
|NAME |ID |DOB       |Height|ShoeSize|
+-----+---+----------+------+--------+
|Bob  |10 |1542189668|0     |0       |
|Alice|10 |1425298030|154   |39      |
+-----+---+----------+------+--------+

显示数据损坏的“Bob”。

最后,我改变了方法,将 DF 转换为 Pandas,然后再转换回 spark:

p_schema = StructType([StructField('NAME',StringType(),True),StructField('ID',StringType(),True),StructField('DOB',StringType(),True),StructField('Height',StringType(),True),StructField('ShoeSize',StringType(),True)])
d1 = [('Bob', '10', '1542189668', '0', '0'),  ('Alice', '10', '1425298030', '154', '39'), ('Bob', '10', '1542189668', '178', '42')]
df = spark.createDataFrame(d1, p_schema)
pdf = df.toPandas()
df_dedupe = pdf.drop_duplicates(subset=['NAME','ID','DOB'], keep='last', inplace=False)

df_spark = spark.createDataFrame(df_dedupe, p_schema)
df_spark.show(100, False)

这终于带来了正确的“鲍勃”:

+-----+---+----------+------+--------+
|NAME |ID |DOB       |Height|ShoeSize|
+-----+---+----------+------+--------+
|Alice|10 |1425298030|154   |39      |
|Bob  |10 |1542189668|178   |42      |
+-----+---+----------+------+--------+

当然,我仍然希望有一个纯 Spark 解决方案,但缺少索引似乎是 Spark 的问题。

谢谢!

【讨论】:

非常感谢您的代码,它对我帮助很大!我做了一个函数来按照你正在做的确切方式执行 AddOrUpdate,它有效!如果有人想让我分享它,请不要犹豫【参考方案2】:

正如您在http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html 函数dropDuplicates(subset=None) 的文档中所见,它只允许子集 作为参数。如果它们都相等,为什么要保留最后一个?

编辑

正如@W-B 所指出的,你想要其他列。我的解决方案是将原始数据帧以相反的顺序排序,并在三个重复的列上使用df_dedupe 进行内部连接并仅保留最后一个值。

df_dedupe.join(original_df,['NAME','ID','DOB'],'inner')

【讨论】:

因为还有其他列,他需要它们的最后一个值

以上是关于Pyspark - 从数据框中删除重复项,保持最后一次出现的主要内容,如果未能解决你的问题,请参考以下文章

从 PySpark 中的数据框中删除重复项

在 pyspark 数据框中使用 write.partitionBy 时如何删除重复项?

从数据框中删除重复项,基于两列 A,B,在另一列 C 中保持具有最大值的行

与使用 Pyspark 的另一个表相比,检查数据框中是不是存在重复项 [重复]

如何从 PySpark Dataframe 中删除重复项并将剩余列值更改为 null

删除 pyspark 中的所有重复实例