Apache spark - 窗口函数,FIRST_VALUE 不起作用
Posted
技术标签:
【中文标题】Apache spark - 窗口函数,FIRST_VALUE 不起作用【英文标题】:Apache spark - Window Function , FIRST_VALUE do not work 【发布时间】:2018-05-15 09:01:49 【问题描述】:我对 WINDOW FUNCTION spark API 有疑问:
我的问题和这个类似:How to drop duplicates using conditions
我有一个数据集:
+---+----------+---------+
| ID| VALUEE| OTHER|
+---+----------+---------+
| 1| null|something|
| 1|[1.0, 0.0]|something|
| 1|[1.0, 0.0]|something|
| 1|[0.0, 2.0]|something|
| 1|[3.0, 5.0]|something|
| 2|[3.0, 5.0]|something|
| 1|[3.0, 5.0]|something|
| 2| null|something|
| 3|[3.0, 5.0]|something|
| 4| null|something|
+---+----------+---------+
我希望每个 ID 只保留一个(不重复),我不关心 VALUEE,但我更喜欢非 NULL 值
预期结果
+---+----------+---------+
| ID| VALUEE| OTHER|
+---+----------+---------+
| 1|[0.0, 2.0]|something|
| 3|[3.0, 5.0]|something|
| 4| null|something|
| 2|[3.0, 5.0]|something|
+---+----------+---------+
带有聚合函数 first() 的 windowsFunction 不起作用 而使用 row_number() 它可以工作
但我不明白为什么首先不起作用
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.spark_project.guava.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.createStructField;
public class TestSOF
public static void main(String[] args)
StructType schema = new StructType(
new StructField[]
createStructField("ID", IntegerType, false),
createStructField("VALUEE", DataTypes.createArrayType(DataTypes.DoubleType), true),
createStructField("OTHER", StringType, true),
);
double [] a =new double[]1.0,0.0;
double [] b =new double[]3.0,5.0;
double [] c =new double[]0.0,2.0;
List<Row> listOfdata = new ArrayList();
listOfdata.add(RowFactory.create(1,null,"something"));
listOfdata.add(RowFactory.create(1,a,"something"));
listOfdata.add(RowFactory.create(1,a,"something"));
listOfdata.add(RowFactory.create(1,c,"something"));
listOfdata.add(RowFactory.create(1,b,"something"));
listOfdata.add(RowFactory.create(2,b,"something"));
listOfdata.add(RowFactory.create(1,b,"something"));
listOfdata.add(RowFactory.create(2,null,"something"));
listOfdata.add(RowFactory.create(3,b,"something"));
listOfdata.add(RowFactory.create(4,null,"something"));
List<Row> rowList = ImmutableList.copyOf(listOfdata);
SparkSession sparkSession = new SparkSession.Builder().config("spark.master", "local[*]").getOrCreate();
sparkSession.sparkContext().setLogLevel("ERROR");
Dataset<Row> dataset = sparkSession.createDataFrame(rowList,schema);
dataset.show();
WindowSpec windowSpec = Window.partitionBy(dataset.col("ID")).orderBy(dataset.col("VALUEE").asc_nulls_last());
// wind solution
// lost information
Dataset<Row> dataset0 = dataset.groupBy("ID").agg(functions.first(dataset.col("VALUEE"), true));
Dataset<Row> dataset1 = dataset.withColumn("new",functions.row_number().over(windowSpec)).where("new = 1").drop("new");
//do not work
Dataset<Row> dataset2 = dataset.withColumn("new",functions.first("VALUEE",true).over(windowSpec)).drop("new");
JavaRDD<Row> rdd =
dataset.toJavaRDD()
.groupBy(row -> row.getAs("ID"))
.map(g ->
Iterator<Row> iter =g._2.iterator();
Row rst = null;
Row tmp;
while(iter.hasNext())
tmp = iter.next();
if (tmp.getAs("VALUEE") != null)
rst=tmp;
break;
if(rst==null)
rst=tmp;
return rst;
);
Dataset<Row> dataset3 = sparkSession.createDataFrame(rdd, schema);
dataset0.show();
dataset1.show();
dataset2.show();
dataset3.show();
【问题讨论】:
【参考方案1】:First 不是 SPARK 2.3 中的窗口函数,它只是一个聚合函数
数据框 API 中不存在 firstValue
【讨论】:
【参考方案2】:您可以使用与您发布的解决方案等效的解决方案。在您的情况下,空值将按第一顺序出现。所以:
val df: DataFrame = ???
import df.sparkSession.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col, last
val id_cols = "ID"
val windowSpec = Window.partitionBy(id_cols).orderBy($"VALUEE".asc)
val list_cols = Seq("VALUE", "OTHER")
val df_dd = df.select(col(id_cols) +: list_cols.map(x => last(col(x)).over(windowSpec).alias(x)):_*).distinct
【讨论】:
【参考方案3】:对于您提供的示例数据,您提供的解决方案数据集 1 的简短版本:
dataset.groupBy("ID").agg(functions.first(dataset.col("VALUEE"), true)).show();
为了理解 Window Functions
和优化 Spark 中 WindowFunction
与 groupBy
的性能,我强烈推荐 Jacek Laskowski 的演讲:
-
https://databricks.com/session/from-basic-to-advanced-aggregate-operators-in-apache-spark-sql-2-2-by-examples-and-their-catalyst-optimizations
https://databricks.com/session/from-basic-to-advanced-aggregate-operators-in-apache-spark-sql-2-2-by-examples-and-their-catalyst-optimizations-continues
【讨论】:
我不想丢失数据集的其他列,你觉得我的rdd解决方案怎么样(见datatset3),再次谢谢以上是关于Apache spark - 窗口函数,FIRST_VALUE 不起作用的主要内容,如果未能解决你的问题,请参考以下文章
在 Apache Spark SQL 中将中值作为窗口函数 (UDAF) 移动