获取 Apache Spark 中重复行的 ID(考虑所有其他列)

Posted

技术标签:

【中文标题】获取 Apache Spark 中重复行的 ID(考虑所有其他列)【英文标题】:Get IDs for duplicate rows (considering all other columns) in Apache Spark 【发布时间】:2017-03-29 13:30:09 【问题描述】:

我有一个 Spark sql 数据框,由 ID 列和 n“数据”列组成,即

id | dat1 | dat2 | ... | datn

id 列是唯一确定的,而查看dat1 ... datn 可能存在重复。

我的目标是找到那些重复的ids。

到目前为止我的方法:

使用groupBy获取重复行:

dup_df = df.groupBy(df.columns[1:]).count().filter('count > 1')

dup_df 与整个df 连接起来以获取重复的行包括 id

df.join(dup_df, df.columns[1:])

我很确定这基本上是正确的,它失败了,因为 dat1 ... datn 列包含 null 值。

要对null 值执行join,我找到了.e.g this SO post。但这需要构造一个巨大的“字符串连接条件”。

所以我的问题:

    是否有一种简单/更通用/更pythonic的方式来处理joins null 值? 或者,更好的是,是否有另一种(更简单、更美观……)方法来获得所需的ids?

顺便说一句:我使用的是 Spark 2.1.0 和 Python 3.5.3

【问题讨论】:

【参考方案1】:

如果每个组的数量ids 相对较小,您可以使用groupBycollect_list。所需的进口

from pyspark.sql.functions import collect_list, size

示例数据:

df = sc.parallelize([
    (1, "a", "b", 3),
    (2, None, "f", None),
    (3, "g", "h", 4),
    (4, None, "f", None),
    (5, "a", "b", 3)
]).toDF(["id"])

查询:

(df
   .groupBy(df.columns[1:])
   .agg(collect_list("id").alias("ids"))
   .where(size("ids") > 1))

结果:

+----+---+----+------+
|  _2| _3|  _4|   ids|
+----+---+----+------+
|null|  f|null|[2, 4]|
|   a|  b|   3|[1, 5]|
+----+---+----+------+

您可以将explode 两次(或使用udf)应用于与join 返回的输出等效的输出。

您还可以使用每个组最少的id 来识别组。一些额外的导入:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, count, min

窗口定义:

w = Window.partitionBy(df.columns[1:])

查询:

(df
    .select(
        "*", 
        count("*").over(w).alias("_cnt"), 
        min("id").over(w).alias("group"))
    .where(col("_cnt") > 1))

结果:

+---+----+---+----+----+-----+
| id|  _2| _3|  _4|_cnt|group|
+---+----+---+----+----+-----+
|  2|null|  f|null|   2|    2|
|  4|null|  f|null|   2|    2|
|  1|   a|  b|   3|   2|    1|
|  5|   a|  b|   3|   2|    1|
+---+----+---+----+----+-----+

您可以进一步使用group 列进行自联接。

【讨论】:

以上是关于获取 Apache Spark 中重复行的 ID(考虑所有其他列)的主要内容,如果未能解决你的问题,请参考以下文章

获取插入行的ID [重复]

获取被筛选器从 spark 数据帧中删除的行的示例

获取最后插入行的主键 ID 以运行多个插入操作 [重复]

我想获取我刚刚插入的行的 ID。我可以知道怎么做吗[重复]

Oracle始终获取具有标识的插入行的ID [重复]

为啥不能在 UDF 中访问数据框? [Apache Spark Scala] [重复]