火花数据框删除重复并保留第一
Posted
技术标签:
【中文标题】火花数据框删除重复并保留第一【英文标题】:spark dataframe drop duplicates and keep first 【发布时间】:2016-12-05 20:19:49 【问题描述】:问题:在 pandas 中删除重复项时,您可以指定要保留哪些列。 Spark Dataframes 中是否有等价物?
熊猫:
df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')
Spark 数据框(我使用 Spark 1.6.0)没有保留选项
df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])
假设 scheduled_datetime
和 flt_flightnumber
是第 6 ,17 列。通过基于这些列的值创建键,我们还可以进行重复数据删除
def get_key(x):
return "01".format(x[6],x[17])
df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))
但是如何指定保留第一行并去掉其他重复项?最后一行呢?
【问题讨论】:
当您运行dropDuplicates
时,您将根据指定的列组合保留第一行并摆脱其余的欺骗。你确定你的代码没有做你想做的事吗?
我测试过,看起来确实如此。
但是如果我想保留最后一行怎么办?我只查看一列的重复值
我认为如果你想保留最后一行,那么你应该先降序排序,然后删除重复。
【参考方案1】:
对于所有说 dropDuplicates 保留第一次出现的人 - 这并不完全正确。
dropDuplicates 保留排序操作的“第一次出现” - 仅当存在 1 个分区时。请参阅下面的一些示例。 然而,这对于大多数 Spark 数据集并不实用。因此,我还包括一个使用 Window 函数 + 排序 + 排名 + 过滤器的“首次出现”删除重复操作的示例。例如,请参见帖子底部。
这是在 Spark 2.4.0 中使用 pyspark 测试的。
dropDuplicates 示例
import pandas as pd
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-02-01
1 1 2018-02-01
2 2 2018-02-01
3 3 2018-02-01
4 4 2018-02-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
.orderBy('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-02-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
.orderBy('datestr')
.repartition('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-02-01|
| 1|2018-01-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr')
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr', ascending = False)
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
窗口、排序、排名、过滤器示例
# generating some example data with pandas
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
# into spark df
df_s = (spark.createDataFrame(dfall))
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+
# however this fails if ties/duplicates exist in the windowing paritions
# and so a tie breaker for the 'rank' function must be added
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-01-01' # note duplicates in this dataset
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
# this will fail, since duplicates exist within the window partitions
# and no way to specify ranking style exists in pyspark rank() fn
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 0|2018-01-01|
| 1|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
| 4|2018-01-01|
+----+----------+
# to deal with ties within window partitions, a tiebreaker column is added
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("col1").orderBy("datestr",'tiebreak')
(df_s
.withColumn('tiebreak', monotonically_increasing_id())
.withColumn('rank', rank().over(window))
.filter(col('rank') == 1).drop('rank','tiebreak')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+
【讨论】:
我用真实数据集(500k+)测试过,.coalesce(1) 是必需的。 以上都没有给出正确的答案。我们需要为这篇文章提供更好的答案。 这个答案几乎是最佳答案。使用 row_number() 函数代替 rank(),而不是允许领带共享相同排名的 'rank'。这比使用决胜列更简单。 很好的例子,但是 coalesce(1) 将所有数据推送到驱动程序,并且体积越大,您会遇到内存问题【参考方案2】:我做了以下事情:
dataframe.groupBy("uniqueColumn").min("time")
这将按给定列分组,并在同一组中选择具有最短时间的列(这将保留第一个并删除其他列)
【讨论】:
想详细说明@mahmoud-hanafy?使用 goupyby 和 min/max 很清楚.. 但这如何解决重复问题?我们如何确保始终保留最短时间的行? 嗨 KevinG,您需要了解 groupBy 的工作原理。如果你有一个 List((0, 1), (0, 2), (0, 3), (1, 4)) 那么你做 groupBy(_._1) 你将有 => Map(0 -> List (1, 2, 3), 1 -> 列表 (4))。然后在选择您将拥有的最小值之一时 => Map(0 -> 1, 1 -> 4)。所以重复的值将被删除,你将只保留最小的一个 不过,您将需要一个后续连接来保留任何其他列。【参考方案3】:解决方案 1 添加一个新列第 num(增量列)并在对您感兴趣的所有列进行分组后根据最小行删除重复项。(您可以包括除第 num col 之外的所有列以删除重复项)
解决方案 2: 将数据框转换为 rdd (df.rdd),然后将 rdd 分组在一个或多个或所有键上,然后在组上运行 lambda 函数并以您想要的方式删除行并仅返回您感兴趣的行在。
我的一个朋友(同样)提到以下(旧解决方案)对他不起作用。 默认情况下使用 dropDuplicates 方法,它会保留第一次出现。
【讨论】:
你能提供一个有效的来源吗? @Manrique - 文档在这里 -> spark.apache.org/docs/2.1.0/api/python/… 感谢@GunayAnach,但我在该文档中看不到默认情况下 dropDuplicates 保留第一次出现。 好像是基于pandas的,where -> keep : 'first', 'last', False, default 'first' pandas.pydata.org/pandas-docs/stable/reference/api/… 但是你说得对,文档上没有指定.与熊猫相比,它们看起来相当干燥。 如何在 scala 中保留最后一个元素【参考方案4】:我刚刚使用 drop_duplicates pyspark 做了一些可能类似于你们需要的事情。
情况是这样的。我有 2 个数据框(来自 2 个文件),除了 2 列 file_date(从文件名中提取的文件日期)和 data_date(行日期戳)之外,它们完全相同。令人讨厌的是,我的行具有相同的 data_date (以及所有其他列单元格)但不同的 file_date 因为它们在每个新的文件中复制并添加了一个新行。
我需要捕获新文件中的所有行,以及前一个文件中剩余的一行。该行不在新文件中。 data_date 右侧的剩余列在相同 data_date 的两个文件之间是相同的。
file_1_20190122 - df1
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-22|2019-01-16| <- One row we want to keep where file_date 22nd
| AGGH|2019-01-22|2019-01-17|
| AGGH|2019-01-22|2019-01-18|
| AGGH|2019-01-22|2019-01-19|
| AGGH|2019-01-22|2019-01-20|
| AGGH|2019-01-22|2019-01-21|
| AGGH|2019-01-22|2019-01-22|
file_2_20190123 - df2
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-23|2019-01-17| \/ ALL rows we want to keep where file_date 23rd
| AGGH|2019-01-23|2019-01-18|
| AGGH|2019-01-23|2019-01-19|
| AGGH|2019-01-23|2019-01-20|
| AGGH|2019-01-23|2019-01-21|
| AGGH|2019-01-23|2019-01-22|
| AGGH|2019-01-23|2019-01-23|
这将要求我们对 df 进行排序和连接,然后在除一列之外的所有列上对它们进行重复数据删除。 让我带你过去。
union_df = df1.union(df2) \
.sort(['station_code', 'data_date'], ascending=[True, True])
+------------+----------+----------+
|station_code| file_date| data_date|
+------------+----------+----------+
| AGGH|2019-01-22|2019-01-16| <- keep
| AGGH|2019-01-23|2019-01-17| <- keep
| AGGH|2019-01-22|2019-01-17| x- drop
| AGGH|2019-01-22|2019-01-18| x- drop
| AGGH|2019-01-23|2019-01-18| <- keep
| AGGH|2019-01-23|2019-01-19| <- keep
| AGGH|2019-01-22|2019-01-19| x- drop
| AGGH|2019-01-23|2019-01-20| <- keep
| AGGH|2019-01-22|2019-01-20| x- drop
| AGGH|2019-01-22|2019-01-21| x- drop
| AGGH|2019-01-23|2019-01-21| <- keep
| AGGH|2019-01-23|2019-01-22| <- keep
| AGGH|2019-01-22|2019-01-22| x- drop
| AGGH|2019-01-23|2019-01-23| <- keep
在这里,我们删除已排序的重复行,不包括键 ['file_date', 'data_date']。
nonduped_union_df = union_df \
.drop_duplicates(['station_code', 'data_date', 'time_zone',
'latitude', 'longitude', 'elevation',
'highest_temperature', 'lowest_temperature',
'highest_temperature_10_year_normal',
'another_50_columns'])
结果包含 1 行,其中 DF1 中最早的日期不在 DF2 中,而 DF2 中的所有行
nonduped_union_df.select(['station_code', 'file_date', 'data_date',
'highest_temperature', 'lowest_temperature']) \
.sort(['station_code', 'data_date'], ascending=[True, True]) \
.show(30)
+------------+----------+----------+-------------------+------------------+
|station_code| file_date| data_date|highest_temperature|lowest_temperature|
+------------+----------+----------+-------------------+------------------+
| AGGH|2019-01-22|2019-01-16| 90| 77| <- df1 22nd
| AGGH|2019-01-23|2019-01-17| 90| 77| \/- df2 23rd
| AGGH|2019-01-23|2019-01-18| 91| 75|
| AGGH|2019-01-23|2019-01-19| 88| 77|
| AGGH|2019-01-23|2019-01-20| 88| 77|
| AGGH|2019-01-23|2019-01-21| 88| 77|
| AGGH|2019-01-23|2019-01-22| 90| 75|
| AGGH|2019-01-23|2019-01-23| 90| 75|
| CWCA|2019-01-22|2019-01-15| 23| -2|
| CWCA|2019-01-23|2019-01-16| 7| -8|
| CWCA|2019-01-23|2019-01-17| 28| -6|
| CWCA|2019-01-23|2019-01-18| 0| -13|
| CWCA|2019-01-23|2019-01-19| 25| -15|
| CWCA|2019-01-23|2019-01-20| -4| -18|
| CWCA|2019-01-23|2019-01-21| 27| -6|
| CWCA|2019-01-22|2019-01-22| 30| 17|
| CWCA|2019-01-23|2019-01-22| 30| 13|
| CWCO|2019-01-22|2019-01-15| 34| 29|
| CWCO|2019-01-23|2019-01-16| 33| 13|
| CWCO|2019-01-22|2019-01-16| 33| 13|
| CWCO|2019-01-22|2019-01-17| 23| 7|
| CWCO|2019-01-23|2019-01-17| 23| 7|
+------------+----------+----------+-------------------+------------------+
only showing top 30 rows
对于这种情况,这可能不是最合适的答案,但它对我有用。
如果卡在某个地方,请告诉我。
顺便说一句 - 如果有人能告诉我如何选择 df 中的所有列,除了没有在列表中列出它们的列 - 我将非常感激。
问候 G
【讨论】:
如何选择 df 中的所有列,除了一个:all_columns_but_one = [c for c in df.columns if c!="undesired_column_name"] df = df.select(all_columns_but_one)
【参考方案5】:
您可以使用带有 row_number 的窗口:
import pandas as pd
df1 = pd.DataFrame('col1':range(0,5))
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame('col1':range(0,5))
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame('col1':range(0,5))
df3['datestr'] = '2018-03-01'
dfall = spark.createDataFrame(pd.concat([df1,df2,df3]))
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col,row_number
window = Window.partitionBy('col1').orderBy(col('datestr'))
dfall.select('*', row_number().over(window).alias('posicion')).show()
dfall.select('*', row_number().over(window).alias('posicion')).where('posicion ==1').show()
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 0|2018-02-01| 2|
| 0|2018-03-01| 3|
| 1|2018-01-01| 1|
| 1|2018-02-01| 2|
| 1|2018-03-01| 3|
| 3|2018-01-01| 1|
| 3|2018-02-01| 2|
| 3|2018-03-01| 3|
| 2|2018-01-01| 1|
| 2|2018-02-01| 2|
| 2|2018-03-01| 3|
| 4|2018-01-01| 1|
| 4|2018-02-01| 2|
| 4|2018-03-01| 3|
+----+----------+--------+
+----+----------+--------+
|col1| datestr|posicion|
+----+----------+--------+
| 0|2018-01-01| 1|
| 1|2018-01-01| 1|
| 3|2018-01-01| 1|
| 2|2018-01-01| 1|
| 4|2018-01-01| 1|
+----+----------+--------+
【讨论】:
【参考方案6】:我会尝试这种方式:
假设您的 data_df 看起来像这样,并且我们希望在每个 datestr 中保留 col1 中具有最高值的行:
col1 datestr
0 2018-01-01
1 2018-01-01
2 2018-01-01
3 2018-01-01
4 2018-01-01
0 2018-02-01
1 2018-02-01
2 2018-02-01
3 2018-02-01
4 2018-02-01
0 2018-03-01
1 2018-03-01
2 2018-03-01
3 2018-03-01
4 2018-03-01
你可以这样做:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('datestr')
data_df = data_df.withColumn("max", F.max(F.col("col1"))\
.over(w))\
.where(F.col('max') == F.col('col1'))\
.drop("max")
这会导致:
col1 datestr
4 2018-01-01
4 2018-02-01
4 2018-03-01
【讨论】:
【参考方案7】:如果数据集不是很大,则转换为 pandas 数据框并删除重复项,保持最后或第一个,然后再转换回来。
【讨论】:
您好,感谢您分享您的想法。问题不在于使用 pandas DF 还是 Spark DF。改变技术并不简单。请详细阅读问题。【参考方案8】:使用window
和row_number
函数。
按升序或降序排序以选择第一个或最后一个。
from pyspark.sql import Window
from pyspark.sql import functions as f
window = Window.partitionBy("col1").orderBy("datestr").asc()
df = (df.withColumn('row', f.row_number().over(window))\
.filter(col('row') == 1)
.drop('row')
.show())
【讨论】:
这个答案已经存在***.com/a/58540058/1386551【参考方案9】:鉴于下表:
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
| 0|2018-02-01|
| 1|2018-02-01|
| 2|2018-02-01|
| 3|2018-02-01|
| 4|2018-02-01|
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
您可以分两步完成:
根据 col1 按给定表分组并选择最小日期。
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
left 将结果表与 col1 和 min_datestr 上的原始表连接起来。
【讨论】:
以上是关于火花数据框删除重复并保留第一的主要内容,如果未能解决你的问题,请参考以下文章