我想知道如何在 spark withColumn 中使用过滤器计数

Posted

技术标签:

【中文标题】我想知道如何在 spark withColumn 中使用过滤器计数【英文标题】:I want to know how to count with filter in spark withColumn 【发布时间】:2021-02-14 10:45:57 【问题描述】:

如图,我想用spark提取数据。

DataSetTest ro1 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro2 = new DataSetTest("apple", "fruit", "red", 4);
DataSetTest ro3 = new DataSetTest("car", "toy", "red", 1);
DataSetTest ro4 = new DataSetTest("bike", "toy", "white", 2);
DataSetTest ro5 = new DataSetTest("bike", "toy", "red", 5);
DataSetTest ro6 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro7 = new DataSetTest("car", "toy", "white", 7);
DataSetTest ro8 = new DataSetTest("apple", "fruit", "green", 1);

Dataset<Row> df = session.getSqlContext().createDataFrame(Arrays.asList(ro1, ro2, ro3, ro4, ro5, ro6, ro7, ro8), DataSetTest.class);

private void process()
    //1) groupByKey
    Dataset<Row> df2 = df.groupBy("keyword", "opt1", "prt2").sum("count");
    
    //2) counting by Opt & calculate the total number
    Dataset<Row> df3 = df2.withColumn("fruit_red", **???**)
            .withColumn("fruit_green", **???**)
            .withColumn("toy_red", **???**)
            .withColumn("toy_white",**???**)
            .withColumn("total_count", ???);
    
    //3) calculate the percent
    Dataset<Row> df4 = df3.withColumn("percent", df3.col("total_count").divide("??sum of total_count??"));

你知道如何计算 2),3) 部分吗?

【问题讨论】:

2) 使用pivot, 3) 使用窗口函数获取总数 【参考方案1】:

我不是 Java 专家,但您可以这样做:

Logger.getLogger("org").setLevel(Level.ERROR) ;
DataSetTest ro1 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro2 = new DataSetTest("apple", "fruit", "red", 4);
DataSetTest ro3 = new DataSetTest("car", "toy", "red", 1);
DataSetTest ro4 = new DataSetTest("bike", "toy", "white", 2);
DataSetTest ro5 = new DataSetTest("bike", "toy", "red", 5);
DataSetTest ro6 = new DataSetTest("apple", "fruit", "red", 3);
DataSetTest ro7 = new DataSetTest("car", "toy", "white", 7);
DataSetTest ro8 = new DataSetTest("apple", "fruit", "green", 1);
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SaavnAnalyticsProject");
SparkSession sc = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> df = sc.createDataFrame(Arrays.asList(ro1, ro2, ro3, ro4, ro5, ro6, ro7, ro8), DataSetTest.class);

Dataset<Row> groupedDf = df.groupBy(col("keyword"), col("opt1"), col("opt2")).sum("cnt");
groupedDf = groupedDf.withColumn("concatCol", concat(col("opt1"), lit("_"), col("opt2")));
groupedDf = groupedDf.drop(col("opt1")).drop(col("opt2"));
groupedDf.show();
Dataset<Row> pivotedDF = groupedDf.groupBy(col("keyword")).pivot("concatCol").sum("sum(cnt)").na().fill(0);


String[] cols = ArrayUtil.removeFromArray(pivotedDF.columns(), "keyword");
String exp = String.join(" + ", cols);
System.out.println(exp);
pivotedDF = pivotedDF.withColumn("total", expr(exp));

pivotedDF.show();

结果如下:

+-------+-----------+---------+-------+---------+- ----+ |关键字|fruit_green|fruit_red|toy_red|toy_white|总计| +-------+-----------+---------+-------+---------+- ----+ |苹果| 1| 10| 0| 0| 11| |汽车| 0| 0| 1| 7| 8| |自行车| 0| 0| 5| 2| 7| +-------+-----------+---------+-------+---------+- ----+

然后:

Long sum = pivotedDF.agg(sum("total")).first().getLong(0);
pivotedDF = pivotedDF
         .withColumn("sum", lit(sum))
         .withColumn("percent", col("total")
                 .divide(col("sum"))).drop(col("sum"));

结果:

+-------+-----------+---------+-------+---------+- --+------------------+ |关键字|fruit_green|fruit_red|toy_red|toy_white|总计|百分比| +-------+-----------+---------+-------+---------+- --+------------------+ |苹果| 1| 10| 0| 0| 11|0.4230769230769231| |汽车| 0| 0| 1| 7| 8|0.3076923076923077| |自行车| 0| 0| 5| 2| 7|0.2692307692307692| +-------+-----------+---------+-------+---------+- --+------------------+

您可以使用 python 或 scala 实现更具可读性的代码

【讨论】:

以上是关于我想知道如何在 spark withColumn 中使用过滤器计数的主要内容,如果未能解决你的问题,请参考以下文章

NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件

如何在 Spark DataFrame 中添加常量列?

如何在 Spark DataFrame 中添加常量列?

如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列

如何在Spark中使用AND或OR条件

spark withcolumn 从现有列创建列重复值