如何在 Spark 数据框中进行分组和聚合后过滤?

Posted

技术标签:

【中文标题】如何在 Spark 数据框中进行分组和聚合后过滤?【英文标题】:How to filter after group by and aggregate in Spark dataframe? 【发布时间】:2017-09-10 03:46:14 【问题描述】:

我有一个火花数据框 df,其架构如下:

[id:string, label:string, tags:string]

id | label | tag
---|-------|-----
 1 | h     | null
 1 | w     | x
 1 | v     | null
 1 | v     | x
 2 | h     | x
 3 | h     | x
 3 | w     | x
 3 | v     | null
 3 | v     | null
 4 | h     | null
 4 | w     | x
 5 | w     | x

(h,w,v 是标签。x 可以是任何非空值)

对于每个id,最多有一个标签“h”或“w”,但可能有多个“v”。我想选择所有满足以下条件的 id:

每个 id 都有: 1.一个标签“h”及其标签= null, 2.一个标签“w”及其标签!= null, 3. 每个id至少有一个标签“v”。

我认为我需要创建三列来检查上述每个条件。然后我需要按“id”分组。

val hCheck = (label: String, tag: String) => if (label=="h" && tag==null) 1 else 0
val udfHCheck = udf(hCheck)
val wCheck = (label: String, tag: String) => if (label=="w" && tag!=null) 1 else 0
val udfWCheck = udf(wCheck)
val vCheck = (label: String) => if (label==null) 1 else 0
val udfVCheck = udf(vCheck)

dfx = df.withColumn("hCheck", udfHCheck(col("label"), col("tag")))
        .withColumn("wCheck", udfWCheck(col("label"), col("tag")))
        .withColumn("vCheck", udfVCheck(col("label")))
        .select("id","hCheck","wCheck","vCheck")
        .groupBy("id")

不知何故,我需要将三列 "hCheck","wCheck","vCheck" 分组到列表 [x,0,0],[0,x,0],[0,0,x] 的向量中.并检查这些向量是否包含所有三个 [1,0,0],[0,1,0],[0,0,1]

我还没能解决这个问题。可能有比这个更好的方法。希望有人能给我建议。谢谢

【问题讨论】:

【参考方案1】:

要将这三个检查转换为向量,您可以执行以下操作: 具体可以这样做:

val df1 = df.withColumn("hCheck", udfHCheck(col("label"), col("tag")))
            .withColumn("wCheck", udfWCheck(col("label"), col("tag")))
            .withColumn("vCheck", udfVCheck(col("label")))
            .select($"id",array($"hCheck",$"wCheck",$"vCheck").as("vec"))

接下来,groupby 返回一个分组对象,您需要在该对象上执行聚合。特别是要获取所有向量,您应该执行以下操作:

    .groupBy("id").agg(collect_list($"vec"))

此外,您不需要 udfs 进行各种检查。您可以使用列语义来做到这一点。例如udfHCheck可以写成:

with($"label" == lit("h") && tag.isnull 1).otherwise(0)

顺便说一句,你说你想要一个标签'v',但在 vcheck 中你只检查标签是否为空。

更新:替代解决方案

再看这个问题,我会做这样的事情:

val grouped = df.groupBy("id", "label").agg(count("$label").as("cnt"), first($"tag").as("tag"))
val filtered1 = grouped.filter($"label" === "v" || $"cnt" === 1)
val filtered2 = filtered.filter($"label" === "v" || ($"label" === "h" && $"tag".isNull) || ($"label" === "w" && $"tag".isNotNull))
val ids = filtered2.groupBy("id").count.filter($"count" === 3)

这个想法是,首先我们将 id 和 label 分组,这样我们就有了组合的信息。我们收集的信息是有多少个值 (cnt) 和第一个元素(不管是哪个)。

现在我们执行两个过滤步骤: 1. 我们恰好需要一个 h 和一个 w 以及任意数量的 v,所以第一个过滤器可以得到这些情况。 2. 我们确保每个案例都符合所有规则。

现在我们只有符合规则的 id 和 label 的组合,所以为了使 id 合法,我们需要恰好有三个 label 实例。这导致第二个 groupby 简单地计算与规则匹配的标签数量。我们需要恰好三个是合法的(即符合所有规则)。

【讨论】:

我找到了一种解决方法,将数据帧转换回 rdd。但是您的解决方案效果很好,我喜欢它。非常感谢!

以上是关于如何在 Spark 数据框中进行分组和聚合后过滤?的主要内容,如果未能解决你的问题,请参考以下文章

大数据Spark DataFrame/DataSet常用操作

大数据Spark DataFrame/DataSet常用操作

如何在 Spark 中一次对多个列进行聚合

Spark数据框中的不同记录数

在数据框中绘制聚合分组

分组后如何按父值过滤结果?