pyspark中的累积和

Posted

技术标签:

【中文标题】pyspark中的累积和【英文标题】:Cumulative sum in pyspark 【发布时间】:2020-04-17 14:02:33 【问题描述】:

我正在尝试计算每个班级的累积总和。代码使用 sum(df.value).over(Window.partitionBy('class').orderBy('time'))

df = sqlContext.createDataFrame( [(1,10,"a"),(3,2,"a"),(1,2,"b"),(2,5,"a"),(2,1,"b"),(9,0,"b"),(4,1,"b"),(7,8,"a"),(3,8,"b"),(2,5,"a"),(0,0,"a"),(4,3,"a")], 
                                     ["time", "value", "class"] )

time|value|class|
+----+-----+-----+
|   1|   10|    a|
|   3|    2|    a|
|   1|    2|    b|
|   2|    5|    a|
|   2|    1|    b|
|   9|    0|    b|
|   4|    1|    b|
|   7|    8|    a|
|   3|    8|    b|
|   2|    5|    a|
|   0|    0|    a|
|   4|    3|    a|


df.withColumn('cumsum_value', sum(df.value).over(Window.partitionBy('class').orderBy('time'))).show()


time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          20|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+

但它不适用于重复的行。期望的输出应该是:

 time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          15|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+

【问题讨论】:

好像你想要.orderBy('time', 'value')(即你必须定义在时间相同的情况下如何打破平局) 【参考方案1】:

添加到 @pault 的评论中,我建议基于 orderBy('time', 'value') 进行 row_number() 计算,然后使用它在另一个窗口 (w2) 的 orderBy 列中获取您的 cum_sum

这将处理时间相同且值相同的两种情况,以及时间相同但值不同的两种情况。

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w1=Window().partitionBy("class").orderBy("time","value")
w2=Window().partitionBy("class").orderBy('rownum')
df.withColumn('rownum', F.row_number().over(w1))\
  .withColumn('cumsum_value', F.sum("value").over(w2)).drop('rownum').show()

+----+-----+-----+------------+
|time|value|class|cumsum_value|
+----+-----+-----+------------+
|   1|    2|    b|           2|
|   2|    1|    b|           3|
|   3|    8|    b|          11|
|   4|    1|    b|          12|
|   9|    0|    b|          12|
|   0|    0|    a|           0|
|   1|   10|    a|          10|
|   2|    5|    a|          15|
|   2|    5|    a|          20|
|   3|    2|    a|          22|
|   4|    3|    a|          25|
|   7|    8|    a|          33|
+----+-----+-----+------------+

【讨论】:

以上是关于pyspark中的累积和的主要内容,如果未能解决你的问题,请参考以下文章

使用窗口函数计算 PySpark 中的累积和

Pyspark 有条件的累积和

Pyspark - 具有重置条件的累积和

Pyspark - 获取具有条件的列的累积总和

开窗函数之累积和,PySpark,Pandas和SQL版实现

在 pyspark 中对大量列进行累积求和的优化方法