获取 Apache Spark 中重复行的 ID(考虑所有其他列)
Posted
技术标签:
【中文标题】获取 Apache Spark 中重复行的 ID(考虑所有其他列)【英文标题】:Get IDs for duplicate rows (considering all other columns) in Apache Spark 【发布时间】:2017-03-29 13:30:09 【问题描述】:我有一个 Spark sql 数据框,由 ID
列和 n
“数据”列组成,即
id | dat1 | dat2 | ... | datn
id
列是唯一确定的,而查看dat1 ... datn
可能存在重复。
我的目标是找到那些重复的id
s。
到目前为止我的方法:
使用groupBy
获取重复行:
dup_df = df.groupBy(df.columns[1:]).count().filter('count > 1')
将dup_df
与整个df
连接起来以获取重复的行包括 id
:
df.join(dup_df, df.columns[1:])
我很确定这基本上是正确的,它失败了,因为 dat1 ... datn
列包含 null
值。
要对null
值执行join
,我找到了.e.g this SO post。但这需要构造一个巨大的“字符串连接条件”。
所以我的问题:
-
是否有一种简单/更通用/更pythonic的方式来处理
joins
null
值?
或者,更好的是,是否有另一种(更简单、更美观……)方法来获得所需的id
s?
顺便说一句:我使用的是 Spark 2.1.0 和 Python 3.5.3
【问题讨论】:
【参考方案1】:如果每个组的数量ids
相对较小,您可以使用groupBy
和collect_list
。所需的进口
from pyspark.sql.functions import collect_list, size
示例数据:
df = sc.parallelize([
(1, "a", "b", 3),
(2, None, "f", None),
(3, "g", "h", 4),
(4, None, "f", None),
(5, "a", "b", 3)
]).toDF(["id"])
查询:
(df
.groupBy(df.columns[1:])
.agg(collect_list("id").alias("ids"))
.where(size("ids") > 1))
结果:
+----+---+----+------+
| _2| _3| _4| ids|
+----+---+----+------+
|null| f|null|[2, 4]|
| a| b| 3|[1, 5]|
+----+---+----+------+
您可以将explode
两次(或使用udf
)应用于与join
返回的输出等效的输出。
您还可以使用每个组最少的id
来识别组。一些额外的导入:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, min
窗口定义:
w = Window.partitionBy(df.columns[1:])
查询:
(df
.select(
"*",
count("*").over(w).alias("_cnt"),
min("id").over(w).alias("group"))
.where(col("_cnt") > 1))
结果:
+---+----+---+----+----+-----+
| id| _2| _3| _4|_cnt|group|
+---+----+---+----+----+-----+
| 2|null| f|null| 2| 2|
| 4|null| f|null| 2| 2|
| 1| a| b| 3| 2| 1|
| 5| a| b| 3| 2| 1|
+---+----+---+----+----+-----+
您可以进一步使用group
列进行自联接。
【讨论】:
以上是关于获取 Apache Spark 中重复行的 ID(考虑所有其他列)的主要内容,如果未能解决你的问题,请参考以下文章