在 Pyspark 中对多列进行累积求和的有效方法

Posted

技术标签:

【中文标题】在 Pyspark 中对多列进行累积求和的有效方法【英文标题】:efficient way to do cumulate sum on multiple columns in Pyspark 【发布时间】:2019-02-26 22:26:03 【问题描述】:

我有一张桌子看起来像:

+----+------+-----+-------+
|time|val1  |val2 |  class|
+----+------+-----+-------+
|   1|    3 |    2|      b|
|   2|    3 |    1|      b|
|   1|    2 |    4|      a|
|   2|    2 |    5|      a|
|   3|    1 |    5|      a|
+----+------+-----+-------+

现在我想对 val1 和 val2 列进行累积求和。所以我创建了一个窗口函数:

windowval = (Window.partitionBy('class').orderBy('time')
             .rangeBetween(Window.unboundedPreceding, 0))


new_df = my_df.withColumn('cum_sum1', F.sum("val1").over(windowval))
              .withColumn('cum_sum2', F.sum("val2").over(windowval))

但我认为 Spark 会在原始表上应用两次窗口函数,这似乎效率较低。由于问题非常简单,有没有办法简单地应用一次窗口函数,然后在两列上一起做累积和?

【问题讨论】:

【参考方案1】:

但我认为 Spark 会在原始表上应用两次窗口函数,这似乎效率较低。

你的假设是不正确的。看看优化后的逻辑就够了

== Optimized Logical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- LogicalRDD [time#0L, val1#1L, val2#2L, class#3], false

或物理计划

== Physical Plan ==
Window [sum(val1#1L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum1#9L, sum(val2#2L) windowspecdefinition(class#3, time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cum_sum2#16L], [class#3], [time#0L ASC NULLS FIRST]
+- *(1) Sort [class#3 ASC NULLS FIRST, time#0L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(class#3, 200)
      +- Scan ExistingRDD[time#0L,val1#1L,val2#2L,class#3]

两者都清楚地表明Window 只应用一次。

【讨论】:

以上是关于在 Pyspark 中对多列进行累积求和的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中对 spark 数据框中的多列求和?

如何在 pyspark 中对 spark 数据框中的多列求和?

pyspark中基于条件对多列进行分组的累积和函数

在 BigQuery Java UDF 中对数组进行累积求和时出现问题

如何在 SQL 中对多列求和

对集合多列进行求和方法的选择