火花数据框删除重复并保留第一

Posted

技术标签:

【中文标题】火花数据框删除重复并保留第一【英文标题】:spark dataframe drop duplicates and keep first 【发布时间】:2016-12-05 20:19:49 【问题描述】:

问题:在 pandas 中删除重复项时,您可以指定要保留哪些列。 Spark Dataframes 中是否有等价物?

熊猫:

df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')

Spark 数据框(我使用 Spark 1.6.0)没有保留选项

df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])

假设 scheduled_datetimeflt_flightnumber 是第 6 ,17 列。通过基于这些列的值创建键,我们还可以进行重复数据删除

def get_key(x):
    return "01".format(x[6],x[17])

df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))

但是如何指定保留第一行并去掉其他重复项?最后一行呢?

【问题讨论】:

当您运行dropDuplicates 时,您将根据指定的列组合保留第一行并摆脱其余的欺骗。你确定你的代码没有做你想做的事吗? 我测试过,看起来确实如此。 但是如果我想保留最后一行怎么办?我只查看一列的重复值 我认为如果你想保留最后一行,那么你应该先降序排序,然后删除重复。 【参考方案1】:

对于所有说 dropDuplicates 保留第一次出现的人 - 这并不完全正确。

dropDuplicates 保留排序操作的“第一次出现” - 仅当存在 1 个分区时。请参阅下面的一些示例。 然而,这对于大多数 Spark 数据集并不实用。因此,我还包括一个使用 Window 函数 + 排序 + 排名 + 过滤器的“首次出现”删除重复操作的示例。例如,请参见帖子底部。

这是在 Spark 2.4.0 中使用 pyspark 测试的。

dropDuplicates 示例

import pandas as pd

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-02-01
1     1  2018-02-01
2     2  2018-02-01
3     3  2018-02-01
4     4  2018-02-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-02-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
   .orderBy('datestr')
   .repartition('datestr')
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates NOT based on occurrence of sorted datestr

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-02-01|
|   1|2018-01-01|
|   3|2018-02-01|
|   2|2018-02-01|
|   4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr')
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)

# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)
(spark
   .createDataFrame(dfall)
   .orderBy('datestr', ascending = False)
   .coalesce(1)
   .dropDuplicates(subset = ['col1'])
   .show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+

窗口、排序、排名、过滤器示例

# generating some example data with pandas
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
# into spark df
df_s = (spark.createDataFrame(dfall))
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+
# however this fails if ties/duplicates exist in the windowing paritions
# and so a tie breaker for the 'rank' function must be added

# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-01-01' # note duplicates in this dataset
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
   col1     datestr
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-01-01
1     1  2018-01-01
2     2  2018-01-01
3     3  2018-01-01
4     4  2018-01-01
0     0  2018-03-01
1     1  2018-03-01
2     2  2018-03-01
3     3  2018-03-01
4     4  2018-03-01
# this will fail, since duplicates exist within the window partitions
# and no way to specify ranking style exists in pyspark rank() fn
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   0|2018-01-01|
|   1|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
|   4|2018-01-01|
+----+----------+
# to deal with ties within window partitions, a tiebreaker column is added
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("col1").orderBy("datestr",'tiebreak')
(df_s
 .withColumn('tiebreak', monotonically_increasing_id())
 .withColumn('rank', rank().over(window))
 .filter(col('rank') == 1).drop('rank','tiebreak')
 .show()
)
+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   3|2018-01-01|
|   2|2018-01-01|
|   4|2018-01-01|
+----+----------+

【讨论】:

我用真实数据集(500k+)测试过,.coalesce(1) 是必需的。 以上都没有给出正确的答案。我们需要为这篇文章提供更好的答案。 这个答案几乎是最佳答案。使用 row_number() 函数代替 rank(),而不是允许领带共享相同排名的 'rank'。这比使用决胜列更简单。 很好的例子,但是 coalesce(1) 将所有数据推送到驱动程序,并且体积越大,您会遇到内存问题【参考方案2】:

我做了以下事情:

dataframe.groupBy("uniqueColumn").min("time")

这将按给定列分组,并在同一组中选择具有最短时间的列(这将保留第一个并删除其他列)

【讨论】:

想详细说明@mahmoud-hanafy?使用 goupyby 和 min/max 很清楚.. 但这如何解决重复问题?我们如何确保始终保留最短时间的行? 嗨 KevinG,您需要了解 groupBy 的工作原理。如果你有一个 List((0, 1), (0, 2), (0, 3), (1, 4)) 那么你做 groupBy(_._1) 你将有 => Map(0 -> List (1, 2, 3), 1 -> 列表 (4))。然后在选择您将拥有的最小值之一时 => Map(0 -> 1, 1 -> 4)。所以重复的值将被删除,你将只保留最小的一个 不过,您将需要一个后续连接来保留任何其他列。【参考方案3】:

解决方案 1 添加一个新列第 num(增量列)并在对您感兴趣的所有列进行分组后根据最小行删除重复项。(您可以包括除第 num col 之外的所有列以删除重复项)

解决方案 2: 将数据框转换为 rdd (df.rdd),然后将 rdd 分组在一个或多个或所有键上,然后在组上运行 lambda 函数并以您想要的方式删除行并仅返回您感兴趣的行在。

我的一个朋友(同样)提到以下(旧解决方案)对他不起作用。 默认情况下使用 dropDuplicates 方法,它会保留第一次出现。

【讨论】:

你能提供一个有效的来源吗? @Manrique - 文档在这里 -> spark.apache.org/docs/2.1.0/api/python/… 感谢@GunayAnach,但我在该文档中看不到默认情况下 dropDuplicates 保留第一次出现。 好像是基于pandas的,where -> keep : 'first', 'last', False, default 'first' pandas.pydata.org/pandas-docs/stable/reference/api/… 但是你说得对,文档上没有指定.与熊猫相比,它们看起来相当干燥。 如何在 scala 中保留最后一个元素【参考方案4】:

我刚刚使用 drop_duplicates pyspark 做了一些可能类似于你们需要的事情。

情况是这样的。我有 2 个数据框(来自 2 个文件),除了 2 列 file_date(从文件名中提取的文件日期)和 data_date(行日期戳)之外,它们完全相同。令人讨厌的是,我的行具有相同的 data_date (以及所有其他列单元格)但不同的 file_date 因为它们在每个新的文件中复制并添加了一个新行。

我需要捕获新文件中的所有行,以及前一个文件中剩余的一行。该行不在新文件中。 data_date 右侧的剩余列在相同 data_date 的两个文件之间是相同的。

file_1_20190122 - df1

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-22|2019-01-16| <- One row we want to keep where file_date 22nd
|        AGGH|2019-01-22|2019-01-17|
|        AGGH|2019-01-22|2019-01-18|
|        AGGH|2019-01-22|2019-01-19|
|        AGGH|2019-01-22|2019-01-20|
|        AGGH|2019-01-22|2019-01-21|
|        AGGH|2019-01-22|2019-01-22|


file_2_20190123 - df2

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-23|2019-01-17| \/ ALL rows we want to keep where file_date 23rd
|        AGGH|2019-01-23|2019-01-18|
|        AGGH|2019-01-23|2019-01-19|
|        AGGH|2019-01-23|2019-01-20|
|        AGGH|2019-01-23|2019-01-21|
|        AGGH|2019-01-23|2019-01-22|
|        AGGH|2019-01-23|2019-01-23|

这将要求我们对 df 进行排序和连接,然后在除一列之外的所有列上对它们进行重复数据删除。 让我带你过去。

union_df = df1.union(df2) \
                .sort(['station_code', 'data_date'], ascending=[True, True])

+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
|        AGGH|2019-01-22|2019-01-16| <- keep
|        AGGH|2019-01-23|2019-01-17| <- keep
|        AGGH|2019-01-22|2019-01-17| x- drop
|        AGGH|2019-01-22|2019-01-18| x- drop
|        AGGH|2019-01-23|2019-01-18| <- keep
|        AGGH|2019-01-23|2019-01-19| <- keep
|        AGGH|2019-01-22|2019-01-19| x- drop
|        AGGH|2019-01-23|2019-01-20| <- keep
|        AGGH|2019-01-22|2019-01-20| x- drop
|        AGGH|2019-01-22|2019-01-21| x- drop
|        AGGH|2019-01-23|2019-01-21| <- keep
|        AGGH|2019-01-23|2019-01-22| <- keep
|        AGGH|2019-01-22|2019-01-22| x- drop
|        AGGH|2019-01-23|2019-01-23| <- keep

在这里,我们删除已排序的重复行,不包括键 ['file_date', 'data_date']

nonduped_union_df = union_df \
            .drop_duplicates(['station_code', 'data_date', 'time_zone', 
                              'latitude', 'longitude', 'elevation', 
                              'highest_temperature', 'lowest_temperature', 
                              'highest_temperature_10_year_normal', 
                              'another_50_columns'])

结果包含 1 行,其中 DF1 中最早的日期不在 DF2 中,而 DF2 中的所有行

nonduped_union_df.select(['station_code', 'file_date', 'data_date', 
                          'highest_temperature', 'lowest_temperature']) \
                         .sort(['station_code', 'data_date'], ascending=[True, True]) \
                         .show(30)


+------------+----------+----------+-------------------+------------------+
|station_code| file_date| data_date|highest_temperature|lowest_temperature|
+------------+----------+----------+-------------------+------------------+
|        AGGH|2019-01-22|2019-01-16|                 90|                77| <- df1 22nd
|        AGGH|2019-01-23|2019-01-17|                 90|                77| \/- df2 23rd
|        AGGH|2019-01-23|2019-01-18|                 91|                75|
|        AGGH|2019-01-23|2019-01-19|                 88|                77|
|        AGGH|2019-01-23|2019-01-20|                 88|                77|
|        AGGH|2019-01-23|2019-01-21|                 88|                77|
|        AGGH|2019-01-23|2019-01-22|                 90|                75|
|        AGGH|2019-01-23|2019-01-23|                 90|                75|
|        CWCA|2019-01-22|2019-01-15|                 23|                -2|
|        CWCA|2019-01-23|2019-01-16|                  7|                -8|
|        CWCA|2019-01-23|2019-01-17|                 28|                -6|
|        CWCA|2019-01-23|2019-01-18|                  0|               -13|
|        CWCA|2019-01-23|2019-01-19|                 25|               -15|
|        CWCA|2019-01-23|2019-01-20|                 -4|               -18|
|        CWCA|2019-01-23|2019-01-21|                 27|                -6|
|        CWCA|2019-01-22|2019-01-22|                 30|                17|
|        CWCA|2019-01-23|2019-01-22|                 30|                13|
|        CWCO|2019-01-22|2019-01-15|                 34|                29|
|        CWCO|2019-01-23|2019-01-16|                 33|                13|
|        CWCO|2019-01-22|2019-01-16|                 33|                13|
|        CWCO|2019-01-22|2019-01-17|                 23|                 7|
|        CWCO|2019-01-23|2019-01-17|                 23|                 7|
+------------+----------+----------+-------------------+------------------+
only showing top 30 rows

对于这种情况,这可能不是最合适的答案,但它对我有用。

如果卡在某个地方,请告诉我。

顺便说一句 - 如果有人能告诉我如何选择 df 中的所有列,除了没有在列表中列出它们的列 - 我将非常感激。

问候 G

【讨论】:

如何选择 df 中的所有列,除了一个:all_columns_but_one = [c for c in df.columns if c!="undesired_column_name"] df = df.select(all_columns_but_one)【参考方案5】:

您可以使用带有 row_number 的窗口:

import pandas as pd
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()

+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   0|2018-02-01|       2|
|   0|2018-03-01|       3|
|   1|2018-01-01|       1|
|   1|2018-02-01|       2|
|   1|2018-03-01|       3|
|   3|2018-01-01|       1|
|   3|2018-02-01|       2|
|   3|2018-03-01|       3|
|   2|2018-01-01|       1|
|   2|2018-02-01|       2|
|   2|2018-03-01|       3|
|   4|2018-01-01|       1|
|   4|2018-02-01|       2|
|   4|2018-03-01|       3|
+----+----------+--------+
+----+----------+--------+
|col1|   datestr|posicion|
+----+----------+--------+
|   0|2018-01-01|       1|
|   1|2018-01-01|       1|
|   3|2018-01-01|       1|
|   2|2018-01-01|       1|
|   4|2018-01-01|       1|
+----+----------+--------+

【讨论】:

【参考方案6】:

我会尝试这种方式:

假设您的 data_df 看起来像这样,并且我们希望在每个 datestr 中保留 col1 中具有最高值的行:

  col1     datestr
     0  2018-01-01
     1  2018-01-01
     2  2018-01-01
     3  2018-01-01
     4  2018-01-01
     0  2018-02-01
     1  2018-02-01
     2  2018-02-01
     3  2018-02-01
     4  2018-02-01
     0  2018-03-01
     1  2018-03-01
     2  2018-03-01
     3  2018-03-01
     4  2018-03-01

你可以这样做:

from pyspark.sql import Window 
import pyspark.sql.functions as F

w = Window.partitionBy('datestr')
data_df = data_df.withColumn("max", F.max(F.col("col1"))\
    .over(w))\
    .where(F.col('max') == F.col('col1'))\
    .drop("max")

这会导致:

  col1     datestr
     4  2018-01-01
     4  2018-02-01
     4  2018-03-01

【讨论】:

【参考方案7】:

如果数据集不是很大,则转换为 pandas 数据框并删除重复项,保持最后或第一个,然后再转换回来。

【讨论】:

您好,感谢您分享您的想法。问题不在于使用 pandas DF 还是 Spark DF。改变技术并不简单。请详细阅读问题。【参考方案8】:

使用windowrow_number 函数。 按升序或降序排序以选择第一个或最后一个。

from pyspark.sql import Window
from pyspark.sql import functions as f

window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())

【讨论】:

这个答案已经存在***.com/a/58540058/1386551【参考方案9】:

鉴于下表:

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
|   0|2018-02-01|
|   1|2018-02-01|
|   2|2018-02-01|
|   3|2018-02-01|
|   4|2018-02-01|
|   0|2018-03-01|
|   1|2018-03-01|
|   2|2018-03-01|
|   3|2018-03-01|
|   4|2018-03-01|
+----+----------+

您可以分两步完成:

根据 col1 按给定表分组并选择最小日期。

+----+----------+
|col1|   datestr|
+----+----------+
|   0|2018-01-01|
|   1|2018-01-01|
|   2|2018-01-01|
|   3|2018-01-01|
|   4|2018-01-01|
+----+----------+

left 将结果表与 col1 和 min_datestr 上的原始表连接起来。

【讨论】:

以上是关于火花数据框删除重复并保留第一的主要内容,如果未能解决你的问题,请参考以下文章

怎么删除ACCESS中的重复记录 只保留一条

mysql删除重复数据,只保留第一条(或最后一条)

【R去重】 保留第一个重复/去所有的重复

按给定列表的顺序选择重复的熊猫数据框行并保留原始索引

通过保留第一次出现的节点来删除 XSLT 属性中的重复项

删除一张表中重复数据并保留一条ID最小的记录