Spark:在每个组中过滤

Posted

技术标签:

【中文标题】Spark:在每个组中过滤【英文标题】:Spark: filter in each group 【发布时间】:2021-03-26 11:13:08 【问题描述】:

我有一个类似的数据框

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     a|2020-01-01 12:49:00|first |
|     b|2020-01-01 12:44:00|second|
|     b|2020-01-01 12:46:00|first |
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

我想删除对于每个组,标签 first 比标签 secondthird 更新的所有行。例如在a 组中,带有first2020-01-01 12:49:00 的行应该被删除,因为有一个带有second 标签的旧行。

期望的输出是:

+------+-------------------+------+
|group |               time| label|
+------+-------------------+------+
|     a|2020-01-01 10:49:00|first |
|     a|2020-01-01 10:51:00|second|
|     b|2020-01-01 12:44:00|second|
|     c|2020-01-01 12:46:00|third |
+------+-------------------+------+

group划分的窗口函数会在每个组内进行过滤,但是如何实现标签上的过滤?

【问题讨论】:

【参考方案1】:

您可以使用不是“第一”的标签获取上一次,并使用该列进行过滤:

import org.apache.spark.sql.expressions.Window

val df2 = df.withColumn(
    "non_first_time", 
    last(
        when(col("label") =!= "first", col("time")), 
        true
    ).over(
        Window.partitionBy("group").orderBy("time")
    )
).filter("""
    label != 'first' or 
    (label = 'first' and (non_first_time > time or non_first_time is null))
""").drop("non_first_time")

df2.show
+-----+-------------------+------+
|group|               time| label|
+-----+-------------------+------+
|    c|2020-01-01 12:46:00| third|
|    b|2020-01-01 12:44:00|second|
|    a|2020-01-01 10:49:00| first|
|    a|2020-01-01 10:51:00|second|
+-----+-------------------+------+

【讨论】:

以上是关于Spark:在每个组中过滤的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Scala 中向数据集添加过滤条件

Spark SQL 二次过滤和分组

如何使用 NSFetchedResultsController 设置过滤指定组中项目的 NSPredicate

在新列上过滤 Spark DataFrame

当我们在 where 中使用过滤器时,spark 如何读取数据

错误:所有观察都在同一组中,同时从闪亮的 ui 列表中动态过滤 R 模型