在 Spark 数据框中聚合时访问窗口外的行

Posted

技术标签:

【中文标题】在 Spark 数据框中聚合时访问窗口外的行【英文标题】:Accessing rows outside of window while aggregating in Spark dataframe 【发布时间】:2016-09-10 00:22:34 【问题描述】:

简而言之,在下面的示例中,我想将 'b 固定为结果将出现在的行中的值。

Given:
    a,b
    1,2
    4,6
    3,7 ==> 'special would be: (1-7 + 4-7 + 3-7) == -13 in this row

    val baseWin = Window.partitionBy("something_I_forgot").orderBy("whatever")
    val sumWin = baseWin.rowsBetween(-2, 0)

    frame.withColumn("special",sum( 'a - 'b ).over(win)  ) 

或者另一种思考方式是我想在计算总和时关闭该行,以便我可以传入 'b 的值(在本例中为 7)

* 更新 * 这是我作为 UDF 想要完成的任务。简而言之,我使用了 foldLeft。

  def mad(field : Column, numPeriods : Integer) : Column = 

    val baseWin = Window.partitionBy("exchange","symbol").orderBy("datetime")
    val win = baseWin.rowsBetween(numPeriods + 1, 0)


    val subFunc: (Seq[Double],Int) => Double =  (input: Seq[Double], numPeriods : Int) => 
      val agg = grizzled.math.stats.mean(input: _*)
      val fooBar = (1.0 / -numPeriods)*input.foldLeft(0.0)( (a,b) => a + Math.abs((b-agg)) )
      fooBar
     

    val myUdf = udf( subFunc )
    myUdf(collect_list(field.cast(DoubleType)).over(win),lit(numPeriods))
  

【问题讨论】:

【参考方案1】:

如果我正确地理解了你想要做什么,我认为你可以重构你的逻辑来实现它。按照您现在的方式,您可能会得到“-7”而不是 -13。

对于“特殊”列,(1-7 + 4-7 + 3-7),您可以像 (sum(a) - count(*) * b) 一样计算它:

dfA.withColumn("special",sum('a).over(win) - count("*").over(win) * 'b)

【讨论】:

这是一个有趣的重构。我没有立即明白为什么它给出了正确的答案。另外,我的例子是一个玩具问题来说明目标,所以它不适用于这种情况。我想要做的实际公式是 Sum(abs('a-'b)) 其中 'b 是上面的特定行值..(技术上是窗口中的最后一个值) 问题是当它进入窗口时,就像在“sum('a - 'b).over(win)”中,“b”的值不会固定,它会随着每一行。这就是为什么我将“b”放在任何窗口计算之外。如果你想要 abs,我认为你可以将表达式包装在 withColumn 中。dfA.withColumn("special", abs(sum('a).over (win) - count("*").over(win) * 'b))

以上是关于在 Spark 数据框中聚合时访问窗口外的行的主要内容,如果未能解决你的问题,请参考以下文章

在mongodb中聚合时怎么将字符串类型的时间转换成ISOdate

Spark SQL 使用窗口 - 根据列条件从当前行之后的行中收集数据

Spark窗口函数过滤不符合要求的行

对 Spark 数据框中的行进行洗牌

在不计算的情况下获取 Spark 数据框中的行数

在 spark 中比较数据框中的行,以根据行的比较为列分配值