如何在spark中分组后根据值进行过滤

Posted

技术标签:

【中文标题】如何在spark中分组后根据值进行过滤【英文标题】:how to filter depending on a value after grouping by in spark 【发布时间】:2020-04-21 18:56:50 【问题描述】:

假设我有以下数据框:

val a=Seq(("aa","b",1),("aa","c",5),("aa","d",0),("xx","y",5),("z","zz",9),("z","b",12)).toDF("name","tag","num").show
+----+---+---+
|name|tag|num|
+----+---+---+
|  aa|  b|  1|
|  aa|  c|  5|
|  aa|  d|  0|
|  xx|  y|  5|
|   z| zz|  9|
|   z|  b| 12|
+----+---+---+

我想过滤这个数据框,以便:

对于每组数据(按名称分组),如果列标签的值为“b”,我将取第 num 列的最大值,否则我忽略该行

这是我想要的输出:

+----+---+---+
|name|tag|num|
+----+---+---+
|  aa|  c|  5|
|   z|  b| 12|
+----+---+---+

说明

名称为 ='aa' 的行组包含其中 tag 的值 == 'b' 的行,因此我取该组的 num 的最大值,即 5。 名称为 ='xx' 的行组没有标签值 == 'b' 的行,因此它是 w 名称为 ='z' 的行组具有其中 tag 的值 == 'b' 的行,因此我取该组的 num 的最大值,即 12。

【问题讨论】:

你可以在 Spark 中创建一个 UserDefinedAggregateFunction。 不建议使用UDF,除非没有其他解决方案,我正在尝试使用window,我认为它可以提供帮助 随你所愿......只是说,但在这种情况下,你仍然需要一个聚合。祝你好运 【参考方案1】:

试试这个:

val df=Seq(("aa","b",1),("aa","c",5),("aa","d",0),("xx","y",5),("z","zz",9),("z","b",12)).toDF("name","tag","num")
df.createOrReplaceTempView("tab")

val res = spark.sql(""" with tw as (select t1.name, max(t1.num) as max_val
                          from tab t1 
                         where t1.name in (select distinct t2.name 
                                             from tab t2
                                            where t2.tag = 'b'
                                          )
                      group by t1.name )
                      select distinct tz.name, tz.tag, tz.num
                        from tab tz, tw
                       where tz.name = tw.name
                         and tz.num  = tw.max_val
                   """) 
res.show(false)

【讨论】:

感谢您的解决方案,我想出了另一个使用窗口的解决方案:` val w=Window.partitionBy("name")` 不止一种方式通向罗马……但我不认为 partitionBy 是这里明显的用例……不过,答案是正确的。 是的,你是对的,而且,窗口方式很复杂

以上是关于如何在spark中分组后根据值进行过滤的主要内容,如果未能解决你的问题,请参考以下文章

分组后如何按父值过滤结果?

当必须根据条件对记录进行分组时如何选择最多 x 行

如何在Kibana中过滤聚合统计值

如何根据过滤条件添加计数列而不是在dplyr中进行分组?

如何在 MDX 中对同一维度进行分组和过滤

ExtJs分组中的clearfilter后如何获取特定组的所有记录