如何在spark中分组后根据值进行过滤
Posted
技术标签:
【中文标题】如何在spark中分组后根据值进行过滤【英文标题】:how to filter depending on a value after grouping by in spark 【发布时间】:2020-04-21 18:56:50 【问题描述】:假设我有以下数据框:
val a=Seq(("aa","b",1),("aa","c",5),("aa","d",0),("xx","y",5),("z","zz",9),("z","b",12)).toDF("name","tag","num").show
+----+---+---+
|name|tag|num|
+----+---+---+
| aa| b| 1|
| aa| c| 5|
| aa| d| 0|
| xx| y| 5|
| z| zz| 9|
| z| b| 12|
+----+---+---+
我想过滤这个数据框,以便:
对于每组数据(按名称分组),如果列标签的值为“b”,我将取第 num 列的最大值,否则我忽略该行
这是我想要的输出:
+----+---+---+
|name|tag|num|
+----+---+---+
| aa| c| 5|
| z| b| 12|
+----+---+---+
说明
名称为 ='aa' 的行组包含其中 tag 的值 == 'b' 的行,因此我取该组的 num 的最大值,即 5。 名称为 ='xx' 的行组没有标签值 == 'b' 的行,因此它是 w 名称为 ='z' 的行组具有其中 tag 的值 == 'b' 的行,因此我取该组的 num 的最大值,即 12。【问题讨论】:
你可以在 Spark 中创建一个 UserDefinedAggregateFunction。 不建议使用UDF,除非没有其他解决方案,我正在尝试使用window,我认为它可以提供帮助 随你所愿......只是说,但在这种情况下,你仍然需要一个聚合。祝你好运 【参考方案1】:试试这个:
val df=Seq(("aa","b",1),("aa","c",5),("aa","d",0),("xx","y",5),("z","zz",9),("z","b",12)).toDF("name","tag","num")
df.createOrReplaceTempView("tab")
val res = spark.sql(""" with tw as (select t1.name, max(t1.num) as max_val
from tab t1
where t1.name in (select distinct t2.name
from tab t2
where t2.tag = 'b'
)
group by t1.name )
select distinct tz.name, tz.tag, tz.num
from tab tz, tw
where tz.name = tw.name
and tz.num = tw.max_val
""")
res.show(false)
【讨论】:
感谢您的解决方案,我想出了另一个使用窗口的解决方案:` val w=Window.partitionBy("name")` 不止一种方式通向罗马……但我不认为 partitionBy 是这里明显的用例……不过,答案是正确的。 是的,你是对的,而且,窗口方式很复杂以上是关于如何在spark中分组后根据值进行过滤的主要内容,如果未能解决你的问题,请参考以下文章