Spark Dataframe GroupBy 和计算复杂聚合函数

Posted

技术标签:

【中文标题】Spark Dataframe GroupBy 和计算复杂聚合函数【英文标题】:Spark Dataframe GroupBy and compute Complex aggregate function 【发布时间】:2017-10-04 07:51:45 【问题描述】:

使用 Spark 数据框,我需要使用以下方法计算百分比 复杂的公式:

按“KEY”分组并计算“re_pct”为 ( sum(sa) / sum( sa / (pct/100) ) ) * 100

例如,输入数据框是

val values1 = List(List("01", "20000", "45.30"), List("01", "30000", "45.30"))
  .map(row => (row(0), row(1), row(2)))

val DS1 = values1.toDF("KEY", "SA", "PCT")
DS1.show()

+---+-----+-----+
|KEY|   SA|  PCT|
+---+-----+-----+
| 01|20000|45.30|
| 01|30000|45.30|
+---+-----+-----+

预期结果:

+---+-----+--------------+
|KEY|    re_pcnt         |
+---+-----+--------------+
| 01|   45.30000038505   |
+---+-----+--------------+

我试着计算如下

    val result = DS1.groupBy("KEY").agg(((sum("SA").divide(
  sum(
    ("SA").divide(
      ("PCT").divide(100)
    )
  )
)) * 100).as("re_pcnt"))

但面临错误:(36, 16) value 除法不是 String ("SA").divide(

对实现上述逻辑有什么建议吗?

【问题讨论】:

【参考方案1】:

您可以尝试导入spark.implicits._,然后使用$ 引用列。

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

val result = DS1.groupBy("KEY")
  .agg(((sum($"SA").divide(sum(($"SA").divide(($"PCT").divide(100))))) * 100)
  .as("re_pcnt"))

这将为您提供所需的输出。

如果您不想导入,可以随时使用col() 命令代替$


使用expr() 可以将字符串用作agg() 函数的输入。但是,输入字符串需要稍作更改。下面给出了与以前完全相同的结果,但使用了一个字符串:

val opr = "sum(SA)/(sum(SA/(PCT/100))) * 100"
val df = DS1.groupBy("KEY").agg(expr(opr).as("re_pcnt"))

注意.as("re_pcnt")需要在agg()方法里面,不能在外面。

【讨论】:

所以在这种情况下是使用 groupy 键或仅最后一个键执行的 sum() @Dee:不确定我是否理解您的问题,但聚合是在每个键上单独执行的。如果输入中有多个键,则输出将有多行。 抱歉,我的问题写得很糟糕。我玩过这个并意识到我想做的事情是不可能的,即我想做的是 sum( log(price) * (quantity / sum(quantity)) ) 或 sum( log(price) * (quantity / sum(quantity) over (partition by id_col) )) 但我不能将 agg 或 window 函数与聚合函数一起使用。在这种情况下,外部 sum()【参考方案2】:

您的代码几乎可以完美运行。您只需输入“$”符号即可指定您正在传递一列:

val result = DS1.groupBy($"KEY").agg(((sum($"SA").divide(
  sum(
    ($"SA").divide(
      ($"PCT").divide(100)
    )
  )
)) * 100).as("re_pcnt"))

这是输出:

result.show()
+---+-------+                                                                   
|KEY|re_pcnt|
+---+-------+
| 01|   45.3|
+---+-------+

【讨论】:

以上是关于Spark Dataframe GroupBy 和计算复杂聚合函数的主要内容,如果未能解决你的问题,请参考以下文章

在 groupby 之后将 Spark DataFrame 的行聚合到 String

spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups

spark sql DataFrame 的 groupBy+agg 与 groupByKey+mapGroups

Spark Streaming Dataframe 执行,有状态,分区本地 groupBy,避免洗牌

Apache Spark Dataframe Groupby agg() 用于多列

Spark DataFrame 的通用“reduceBy”或“groupBy + aggregate”功能