我想知道如何在 spark withColumn 中使用过滤器计数
Posted
技术标签:
【中文标题】我想知道如何在 spark withColumn 中使用过滤器计数【英文标题】:I want to know how to count with filter in spark withColumn 【发布时间】:2021-02-14 10:45:57 【问题描述】:如图,我想用spark提取数据。
DataSetTest ro1 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro2 = new DataSetTest("apple", "fruit", "red", 4);
DataSetTest ro3 = new DataSetTest("car", "toy", "red", 1);
DataSetTest ro4 = new DataSetTest("bike", "toy", "white", 2);
DataSetTest ro5 = new DataSetTest("bike", "toy", "red", 5);
DataSetTest ro6 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro7 = new DataSetTest("car", "toy", "white", 7);
DataSetTest ro8 = new DataSetTest("apple", "fruit", "green", 1);
Dataset<Row> df = session.getSqlContext().createDataFrame(Arrays.asList(ro1, ro2, ro3, ro4, ro5, ro6, ro7, ro8), DataSetTest.class);
private void process()
//1) groupByKey
Dataset<Row> df2 = df.groupBy("keyword", "opt1", "prt2").sum("count");
//2) counting by Opt & calculate the total number
Dataset<Row> df3 = df2.withColumn("fruit_red", **???**)
.withColumn("fruit_green", **???**)
.withColumn("toy_red", **???**)
.withColumn("toy_white",**???**)
.withColumn("total_count", ???);
//3) calculate the percent
Dataset<Row> df4 = df3.withColumn("percent", df3.col("total_count").divide("??sum of total_count??"));
你知道如何计算 2),3) 部分吗?
【问题讨论】:
2) 使用pivot
, 3) 使用窗口函数获取总数
【参考方案1】:
我不是 Java 专家,但您可以这样做:
Logger.getLogger("org").setLevel(Level.ERROR) ;
DataSetTest ro1 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro2 = new DataSetTest("apple", "fruit", "red", 4);
DataSetTest ro3 = new DataSetTest("car", "toy", "red", 1);
DataSetTest ro4 = new DataSetTest("bike", "toy", "white", 2);
DataSetTest ro5 = new DataSetTest("bike", "toy", "red", 5);
DataSetTest ro6 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro7 = new DataSetTest("car", "toy", "white", 7);
DataSetTest ro8 = new DataSetTest("apple", "fruit", "green", 1);
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SaavnAnalyticsProject");
SparkSession sc = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> df = sc.createDataFrame(Arrays.asList(ro1, ro2, ro3, ro4, ro5, ro6, ro7, ro8), DataSetTest.class);
Dataset<Row> groupedDf = df.groupBy(col("keyword"), col("opt1"), col("opt2")).sum("cnt");
groupedDf = groupedDf.withColumn("concatCol", concat(col("opt1"), lit("_"), col("opt2")));
groupedDf = groupedDf.drop(col("opt1")).drop(col("opt2"));
groupedDf.show();
Dataset<Row> pivotedDF = groupedDf.groupBy(col("keyword")).pivot("concatCol").sum("sum(cnt)").na().fill(0);
String[] cols = ArrayUtil.removeFromArray(pivotedDF.columns(), "keyword");
String exp = String.join(" + ", cols);
System.out.println(exp);
pivotedDF = pivotedDF.withColumn("total", expr(exp));
pivotedDF.show();
结果如下:
+-------+-----------+---------+-------+---------+- ----+ |关键字|fruit_green|fruit_red|toy_red|toy_white|总计| +-------+-----------+---------+-------+---------+- ----+ |苹果| 1| 10| 0| 0| 11| |汽车| 0| 0| 1| 7| 8| |自行车| 0| 0| 5| 2| 7| +-------+-----------+---------+-------+---------+- ----+然后:
Long sum = pivotedDF.agg(sum("total")).first().getLong(0);
pivotedDF = pivotedDF
.withColumn("sum", lit(sum))
.withColumn("percent", col("total")
.divide(col("sum"))).drop(col("sum"));
结果:
+-------+-----------+---------+-------+---------+- --+------------------+ |关键字|fruit_green|fruit_red|toy_red|toy_white|总计|百分比| +-------+-----------+---------+-------+---------+- --+------------------+ |苹果| 1| 10| 0| 0| 11|0.4230769230769231| |汽车| 0| 0| 1| 7| 8|0.3076923076923077| |自行车| 0| 0| 5| 2| 7|0.2692307692307692| +-------+-----------+---------+-------+---------+- --+------------------+您可以使用 python 或 scala 实现更具可读性的代码
【讨论】:
以上是关于我想知道如何在 spark withColumn 中使用过滤器计数的主要内容,如果未能解决你的问题,请参考以下文章
NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件