在窗口上聚合(总和)以获得列列表

Posted

技术标签:

【中文标题】在窗口上聚合(总和)以获得列列表【英文标题】:Aggregate (Sum) over Window for a list of Columns 【发布时间】:2017-06-13 06:18:35 【问题描述】:

对于 DataFrame 中可用的列列表,我无法找到一种通用方法来计算给定窗口上的 Sum(或任何聚合函数)。

val inputDF = spark
.sparkContext
.parallelize(
    Seq(
        (1,2,1, 30, 100),
        (1,2,2, 30, 100), 
        (1,2,3, 30, 100),
        (11,21,1, 30, 100),
        (11,21,2, 30, 100), 
        (11,21,3, 30, 100)
    ),
    10)
.toDF("c1", "c2", "offset", "v1", "v2")

input.show
+---+---+------+---+---+
| c1| c2|offset| v1| v2|
+---+---+------+---+---+
|  1|  2|     1| 30|100|
|  1|  2|     2| 30|100|
|  1|  2|     3| 30|100|
| 11| 21|     1| 30|100|
| 11| 21|     2| 30|100|
| 11| 21|     3| 30|100|
+---+---+------+---+---+

给定一个如上所示的DataFrame,很容易找到列列表的Sum,类似于下面显示的代码sn-p -

val groupKey = List("c1", "c2").map(x => col(x.trim))
    val orderByKey = List("offset").map(x => col(x.trim))

    val aggKey = List("v1", "v2").map(c => sum(c).alias(c.trim))

    import org.apache.spark.sql.expressions.Window

    val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

    val outputDF = inputDF
    .groupBy(groupKey: _*)
    .agg(aggKey.head, aggKey.tail: _*)

    outputDF.show

但我似乎无法通过窗口规范找到类似的聚合函数方法。到目前为止,我只能通过单独指定每一列来解决这个问题,如下所示 -

val outputDF2 = inputDF
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))

如果有一种方法可以对动态列列表进行聚合,我将不胜感激。谢谢!

【问题讨论】:

您是否尝试过使用inputDF.types.foreach 谢谢。您能否详细说明在这种情况下我如何使用每个。我的 outputDF2 应该包含输入中的所有列以及列表中指定列的运行总和 【参考方案1】:

我认为我找到了一种比上述问题中所述的方法更好的方法。

/**
    * Utility method takes a DataFrame and a List of columns to return aggregated values for the specified list of columns
    * @param colsToAggregate    Seq[String] of all columns in the input DataFrame to be aggregated
    * @param inputDF            Input DataFrame
    * @param f                  aggregate function 'call by name'
    * @param partitionByColSeq  Seq[] of column names to partition the inputDF before applying the aggregate
    * @param orderByColSeq      Seq[] of column names to order the inputDF before applying the aggregate
    * @param name_prefix        String to prefix the new columns with, to avoid collisions
    * @param name               New column names. Uses Identify function and reuses aggregated column names
    * @return                   output DataFrame
    */
  def withRollingAggregateColumns(colsToAggregate: Seq[String],
                                  inputDF: DataFrame,
                                  f: String => Column,
                                  partitionByColSeq: Seq[String],
                                  orderByColSeq: Seq[String],
                                  name_prefix: String,
                                  name: String => String = identity) = 

    val groupByKey = partitionByColSeq.map(x => col(x.trim))
    val orderByKey = orderByColSeq.map(x => col(x.trim))

    import org.apache.spark.sql.expressions.Window

    val w = Window.partitionBy(groupByKey: _*).orderBy(orderByKey: _*)

    colsToAggregate
      .foldLeft(inputDF)(
        (df, elementInCols) => df
          .withColumn(
            name_prefix + "_" + name(elementInCols),
            f(elementInCols).over(w)
          )
      )
  

在这种情况下,Utility 方法将 DataFrame 作为输入,并根据提供的函数 f 附加新列。它使用“withColumn”和“foldLeft”语法来遍历需要聚合的列列表。为了避免任何列名冲突,它将用户提供的“前缀”附加到新的聚合列

【讨论】:

以上是关于在窗口上聚合(总和)以获得列列表的主要内容,如果未能解决你的问题,请参考以下文章

如何汇总 MongoDB 中的总和以获得总计数?

如何对窗口中的某个字段进行排序以获得前 N 个值,并对 DolphinDB 中的相应字段进行聚合计算?

如何获得二维数组中每一列和每一行的总和?

如何获得telerik ListView特定列的总和?

C#对象列表,我如何获得一个属性的总和

我们可以在没有循环的情况下在 oracle 中获得列表属性的总和吗?