使用窗口函数计算 PySpark 中的累积和

Posted

技术标签:

【中文标题】使用窗口函数计算 PySpark 中的累积和【英文标题】:Calculating Cumulative sum in PySpark using Window Functions 【发布时间】:2017-10-27 16:31:04 【问题描述】:

我有以下示例数据框:

rdd = sc.parallelize([(1,20), (2,30), (3,30)])
df2 = spark.createDataFrame(rdd, ["id", "duration"])
df2.show()

+---+--------+
| id|duration|
+---+--------+
|  1|      20|
|  2|      30|
|  3|      30|
+---+--------+

我想按持续时间的 desc 顺序对这个 DataFrame 进行排序,并添加一个包含持续时间累积总和的新列。所以我做了以下事情:

windowSpec = Window.orderBy(df2['duration'].desc())

df_cum_sum = df2.withColumn("duration_cum_sum", sum('duration').over(windowSpec))

df_cum_sum.show()

+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
|  2|      30|              60|
|  3|      30|              60|
|  1|      20|              80|
+---+--------+----------------+

我想要的输出是:

+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
|  2|      30|              30| 
|  3|      30|              60| 
|  1|      20|              80|
+---+--------+----------------+

我如何得到这个?

这是细分:

+--------+----------------+
|duration|duration_cum_sum|
+--------+----------------+
|      30|              30| #First value
|      30|              60| #Current duration + previous cum sum value
|      20|              80| #Current duration + previous cum sum value     
+--------+----------------+

【问题讨论】:

【参考方案1】:

您可以介绍row_number来打破僵局;如果写在sql:

df2.selectExpr(
    "id", "duration", 
    "sum(duration) over (order by row_number() over (order by duration desc)) as duration_cum_sum"
 ).show()

+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
|  2|      30|              30|
|  3|      30|              60|
|  1|      20|              80|
+---+--------+----------------+

【讨论】:

【参考方案2】:

你可以在这里查看

df2.withColumn('cumu', F.sum('duration').over(Window.orderBy(F.col('duration').desc()).rowsBetween(Window.unboundedPreceding, 0)
)).show()

【讨论】:

以上是关于使用窗口函数计算 PySpark 中的累积和的主要内容,如果未能解决你的问题,请参考以下文章

开窗函数之累积和,PySpark,Pandas和SQL版实现

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数

如何计算pyspark表中的累积和

PySpark 中的窗口函数和条件过滤器

如何使用滚动窗口函数计算 Pyspark Dataframe 中等于某个值的相邻值的数量?

Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数