Spark多个动态聚合函数,countDistinct不起作用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark多个动态聚合函数,countDistinct不起作用相关的知识,希望对你有一定的参考价值。

使用多个动态聚合操作在Spark数据帧上进行聚合。

我想使用带有多个动态聚合操作(由用户在JSON中传递)的Scala对Spark数据帧进行聚合。我正在将JSON转换为Map

以下是一些示例数据:

colA    colB    colC    colD
1       2       3       4
5       6       7       8
9       10      11      12

我正在使用的Spark聚合代码:

var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

我必须将aggFuncMap作为Map传递,以便用户可以通过JSON配置传递任意数量的聚合。

上面的代码适用于某些聚合,包括summinmaxavgcount

但是,不幸的是,这段代码不适用于countDistinct(也许是因为它是驼峰式的?)。

运行上面的代码时,我收到此错误:

线程“main”中的异常org.apache.spark.sql.AnalysisException:未定义的函数:'countdistinct'。此函数既不是已注册的临时函数,也不是在数据库'default'中注册的永久函数

任何帮助将不胜感激!

答案

目前无法在agg中使用countDistinctMap。从documentation我们看到:

可用的聚合方法是avg,max,min,sum,count。


一个可能的解决办法是将Map改为Seq[Column]

val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

但如果用户要在配置文件中指定聚合,这将无济于事。

另一种方法是使用expr,此函数将评估一个字符串并返回一列。但是,expr不接受"countDistinct",而是需要使用"count(distinct(...))"。这可以编码如下:

val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

以上是关于Spark多个动态聚合函数,countDistinct不起作用的主要内容,如果未能解决你的问题,请参考以下文章

极简spark教程spark聚合函数

Spark Scala数据框具有单个Group By的多个聚合[重复]

如何在 Spark 中一次对多个列进行聚合

Spark 系列—— Spark SQL 聚合函数 Aggregations

Spark调优聚合操作数据倾斜解决方案

spark中的聚合函数总结