Spark drop 重复并选择具有最大值的行

Posted

技术标签:

【中文标题】Spark drop 重复并选择具有最大值的行【英文标题】:Spark drop duplicates and select row with max value 【发布时间】:2020-11-17 21:20:38 【问题描述】:

我正在尝试根据 column1 删除重复项并选择 column2 中具有最大值的行。 column2 具有“年”(2019、2020 等)作为值,它的类型为“字符串”。我的解决方案是将第 2 列转换为整数并选择最大值。

Dataset<Row> ds ; //The dataset with column1,column2(year), column3 etc.
Dataset<Row> newDs = ds.withColumn("column2Int", col("column2").cast(DataTypes.IntegerType));
newDs  = newDs.groupBy("column1").max("column2Int"); // drops all other columns

当我执行“分组依据”时,这种方法会删除原始数据集“ds”中的所有其他列。所以我必须在 'ds' 和 'newDS' 之间进行连接以取回所有原始列。此外,将 String 列强制转换为 Integer 似乎是一种无效的解决方法。

是否可以删除重复项并从原始数据集本身获取具有更大字符串值的行?

【问题讨论】:

【参考方案1】:

这是一个经典的重复数据删除问题,您需要为此使用 Window + Rank + filter 组合。

我对 Java 语法不是很熟悉,但是示例代码应该如下所示,

    import org.apache.spark.sql.expressions.Window;
    import org.apache.spark.sql.expressions.WindowSpec;
    import org.apache.spark.sql.functions;
    import org.apache.spark.sql.types.DataTypes;
    
    Dataset<Row> df = ???;
    
    WindowSpec windowSpec = Window.partitionBy("column1").orderBy(functions.desc("column2Int"));
    
    Dataset<Row> result =
        df.withColumn("column2Int", functions.col("column2").cast(DataTypes.IntegerType))
            .withColumn("rank", functions.rank().over(windowSpec))
            .where("rank == 1")
            .drop("rank");

    result.show(false);

发生的事情的概述,

    将转换后的整数列添加到 df 以供将来排序。 根据 column1 的值在您的数据集(分区)中形成子部分/窗口 对于这些子部分/窗口/分区中的每一个,行都按强制转换为 int 的列进行排序。描述您想要的最大顺序。 像行号这样的排名被分配给每个创建的分区/窗口中的行。 对所有排名为 1 的行进行过滤(最大值,因为排序是 desc。)

【讨论】:

感谢您的详细解决方案。仅当 column2 具有特定 column1 值的多个相同值时,它才会失败。但我能应付。此外,我认为 order by 也可以应用于 String column2,它给出了最大年份值。您认为您的解决方案会比 Question 中的解决方案提供更好的性能吗? 或者,您可以只做max("column2").over(windowSpec) 而不是对整个分区进行排名,不是吗?

以上是关于Spark drop 重复并选择具有最大值的行的主要内容,如果未能解决你的问题,请参考以下文章

SQL仅选择列上具有最大值的行[重复]

SQL仅选择列上具有最大值的行[重复]

SQL仅选择列上具有最大值的行[重复]

SQL仅选择列上具有最大值的行[重复]

SQL仅选择列上具有最大值的行[重复]

Spark DataFrames中的argmax:如何检索具有最大值的行