仅保留 DataFrame 中有关某些字段的重复项
Posted
技术标签:
【中文标题】仅保留 DataFrame 中有关某些字段的重复项【英文标题】:Keep only duplicates from a DataFrame regarding some field 【发布时间】:2018-03-29 15:34:27 【问题描述】:我有这个 spark DataFrame:
+---+-----+------+----+------------+------------+
| ID| ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT| QWA| 6|null| 08:59:00| 23:30:00|
|ALT|AUTRE| 2|null| 08:58:00| 23:29:00|
|TDR| QWA| 3|null| 08:57:00| 23:28:00|
|ALT| TEST| 4|null| 08:56:00| 23:27:00|
|ALT| QWA| 6|null| 08:55:00| 23:26:00|
|ALT| QWA| 2|null| 08:54:00| 23:25:00|
|ALT| QWA| 2|null| 08:53:00| 23:24:00|
+---+-----+------+----+------------+------------+
我想获得一个新的数据框,其中仅包含 "ID"
、"ID2"
和 "Number"
三个字段不唯一的行。
表示我想要这个DataFrame:
+---+-----+------+----+------------+------------+
| ID| ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT| QWA| 6|null| 08:59:00| 23:30:00|
|ALT| QWA| 2|null| 08:53:00| 23:24:00|
+---+-----+------+----+------------+------------+
或者可能是一个包含所有重复项的数据框:
+---+-----+------+----+------------+------------+
| ID| ID2|Number|Name|Opening_Hour|Closing_Hour|
+---+-----+------+----+------------+------------+
|ALT| QWA| 6|null| 08:59:00| 23:30:00|
|ALT| QWA| 6|null| 08:55:00| 23:26:00|
|ALT| QWA| 2|null| 08:54:00| 23:25:00|
|ALT| QWA| 2|null| 08:53:00| 23:24:00|
+---+-----+------+----+------------+------------+
【问题讨论】:
【参考方案1】:执行此操作的一种方法是使用pyspark.sql.Window
添加一列,该列计算每行的("ID", "ID2", "Name")
组合的重复数。然后只选择重复数大于1的行。
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('ID', 'ID2', 'Number')
df.select('*', f.count('ID').over(w).alias('dupeCount'))\
.where('dupeCount > 1')\
.drop('dupeCount')\
.show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA| 2|null| 08:54:00| 23:25:00|
#|ALT|QWA| 2|null| 08:53:00| 23:24:00|
#|ALT|QWA| 6|null| 08:59:00| 23:30:00|
#|ALT|QWA| 6|null| 08:55:00| 23:26:00|
#+---+---+------+----+------------+------------+
我使用pyspark.sql.functions.count()
来计算每个组中的项目数。这将返回一个包含所有重复项的 DataFrame(您显示的第二个输出)。
如果您只想获得每个("ID", "ID2", "Name")
组合的一行,您可以使用另一个窗口来对行进行排序。
例如,下面我为row_number
添加另一列,并仅选择重复计数大于1且行数等于1的行。这样可以保证每个分组一个行。
w2 = Window.partitionBy('ID', 'ID2', 'Number').orderBy('ID', 'ID2', 'Number')
df.select(
'*',
f.count('ID').over(w).alias('dupeCount'),
f.row_number().over(w2).alias('rowNum')
)\
.where('(dupeCount > 1) AND (rowNum = 1)')\
.drop('dupeCount', 'rowNum')\
.show()
#+---+---+------+----+------------+------------+
#| ID|ID2|Number|Name|Opening_Hour|Closing_Hour|
#+---+---+------+----+------------+------------+
#|ALT|QWA| 2|null| 08:54:00| 23:25:00|
#|ALT|QWA| 6|null| 08:59:00| 23:30:00|
#+---+---+------+----+------------+------------+
【讨论】:
【参考方案2】:这是一种无需 Window 的方法。
具有重复项的 DataFrame
df.exceptAll(df.drop_duplicates(['ID', 'ID2', 'Number'])).show()
# +---+---+------+------------+------------+
# | ID|ID2|Number|Opening_Hour|Closing_Hour|
# +---+---+------+------------+------------+
# |ALT|QWA| 2| 08:53:00| 23:24:00|
# |ALT|QWA| 6| 08:55:00| 23:26:00|
# +---+---+------+------------+------------+
包含所有重复项的 DataFrame(使用 left_anti 连接)
df.join(df.groupBy('ID', 'ID2', 'Number')\
.count().where('count = 1').drop('count'),
on=['ID', 'ID2', 'Number'],
how='left_anti').show()
# +---+---+------+------------+------------+
# | ID|ID2|Number|Opening_Hour|Closing_Hour|
# +---+---+------+------------+------------+
# |ALT|QWA| 2| 08:54:00| 23:25:00|
# |ALT|QWA| 2| 08:53:00| 23:24:00|
# |ALT|QWA| 6| 08:59:00| 23:30:00|
# |ALT|QWA| 6| 08:55:00| 23:26:00|
# +---+---+------+------------+------------+
【讨论】:
请注意,虽然使用exceptAll()
方法似乎比window
更容易实现,但它效率极低且计算量很大。【参考方案3】:
要扩展 pault 的 really great answer:我经常需要将数据帧子集化为仅重复 x 次的条目,并且由于我需要经常这样做,所以我将它变成了一个函数,我只需导入大量我脚本开头的其他辅助函数:
import pyspark.sql.functions as f
from pyspark.sql import Window
def get_entries_with_frequency(df, cols, num):
"""
This function will filter the dataframe df down to all the rows that
have the same values in cols num times. Example: If num=3, col="cartype",
then the function will only return rows where a certain cartype occurs exactly 3 times
in the dataset. If col "cartype" contains the following:
["Mazda", "Seat", "Seat", "VW", "Mercedes", "VW", "VW", "Mercedes", "Seat"],
then the function will only return rows containing "VW" or "Seat"
since these occur exactly 3 times.
df: Pyspark dataframe
cols: Either string column name or list of strings for multiple columns.
num: int - The exact number of times a value (or combination of values,
if cols is a list) has to appear in df.
"""
if type(cols)==str:
cols = [cols]
w = Window.partitionBy(cols)
return df.select('*', f.count(cols[0]).over(w).alias('dupeCount'))\
.where("dupeCount = ".format(num))\
.drop('dupeCount')
【讨论】:
试想一下,如果get_entries_with_frequency
有参数的文档字符串,这个答案会有多棒,所以除了@Thomas 之外的其他人可以使用它。
@pauljohn32 自从我写这篇文章以来已经有一段时间了,我发现很难解释它到底做了什么,但我已经尽力了 ;-)以上是关于仅保留 DataFrame 中有关某些字段的重复项的主要内容,如果未能解决你的问题,请参考以下文章
删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?
排除 MS SQL Server 2008 中的“某些”重复行