SPARK DataFrame:删除组中的最大值

Posted

技术标签:

【中文标题】SPARK DataFrame:删除组中的最大值【英文标题】:SPARK DataFrame: Remove MAX value in a group 【发布时间】:2016-05-11 00:49:47 【问题描述】:

我的数据是这样的:

id | val
---------------- 
a1 |  10
a1 |  20
a2 |  5
a2 |  7
a2 |  2

如果我在“id”上分组,我正在尝试删除组中具有 MAX(val) 的行。

结果应该是这样的:

id | val
---------------- 
a1 |  10
a2 |  5
a2 |  2

我正在使用 SPARK DataFrame 和 SQLContext。我需要一些方法:

DataFrame df = sqlContext.sql("SELECT * FROM jsontable WHERE (id, val) NOT IN (SELECT is,MAX(val) from jsontable GROUP BY id)");

我该怎么做?

【问题讨论】:

【参考方案1】:

您可以使用数据框操作和窗口函数来做到这一点。假设您在数据框中有数据df1

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val maxOnWindow = max(col("val")).over(Window.partitionBy(col("id")))
val df2 = df1
  .withColumn("max", maxOnWindow)
  .where(col("val") < col("max"))
  .select("id", "val")

在 Java 中,等价物类似于:

import org.apache.spark.sql.functions.Window;
import static org.apache.spark.sql.functions.*;

Column maxOnWindow = max(col("val")).over(Window.partitionBy("id"));
DataFrame df2 = df1
    .withColumn("max", maxOnWindow)
    .where(col("val").lt(col("max")))
    .select("id", "val");

这是一篇关于窗口函数的好文章:https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

【讨论】:

谢谢丹尼尔!我现在正在尝试你的方法。只是想知道它比“加入”方法更好吗? @user3802925 由于您的数据最初位于 DataFrame 中,因此这种方法避免了将其转换为 RDD,这在性能和代码可读性方面已经有所提升。但是,我认为这两种方法之间的实际性能差异将取决于您的数据,并且需要进行一些测试才能得出结论。 @user3802925:抱歉,我没有看到 DataFrame 的加入版本。在这种情况下,只有我的第二个陈述成立。【参考方案2】:

下面是马里奥的scala代码的Java实现:

DataFrame df = sqlContext.read().json(input);
DataFrame dfMaxRaw = df.groupBy("id").max("val");
DataFrame dfMax = dfMaxRaw.select(
    dfMaxRaw.col("id").as("max_id"), dfMaxRaw.col("max(val)").as("max_val")
);
DataFrame combineMaxWithData = df.join(dfMax, df.col("id")
    .equalTo(dfMax.col("max_id")));
DataFrame finalResult = combineMaxWithData.filter(
    combineMaxWithData.col("id").equalTo(combineMaxWithData.col("max_id"))
        .and(combineMaxWithData.col("val").notEqual(combineMaxWithData.col("max_val"))) 
);

【讨论】:

看起来不错,虽然有点冗长,但这是 Java 所期望的 :)【参考方案3】:

下面是如何使用 RDD 和更 Scala 风格的方法来做到这一点:

// Let's first get the data in key-value pair format
val data = sc.makeRDD( Seq( ("a",20), ("a", 1), ("a",8), ("b",3), ("b",10), ("b",9) ) )

// Next let's find the max value from each group
val maxGroups = data.reduceByKey( Math.max(_,_) )

// We join the max in the group with the original data
val combineMaxWithData = maxGroups.join(data)

// Finally we filter out the values that agree with the max
val finalResults = combineMaxWithData.filter case (gid, (max,curVal)) => max != curVal .map case (gid, (max,curVal)) => (gid,curVal) 


println( finalResults.collect.toList )
>List((a,1), (a,8), (b,3), (b,9))

【讨论】:

我还没有在 Java 上做过任何 Spark ...这些 API 中的每一个都应该与 Java 等效。翻译可能并不难。主要思想应该适用。 感谢马里奥斯!下面是我对连接进行了一些修改的 Java 实现,我使用键而不是完全笛卡尔连接来连接。

以上是关于SPARK DataFrame:删除组中的最大值的主要内容,如果未能解决你的问题,请参考以下文章

如何在DataFrame组中执行算术运算在Spark中进行聚合? [重复]

删除Apache Spark DataFrame中的重复项,并保留尚未删除的值的行?

按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除

根据列值有效地从宽 Spark Dataframe 中删除列

从Scala中的任意数据数据获取Spark DataFrame的最简单方法是什么?

spark利器2函数之dataframe全局排序id与分组后保留最大值行