使用窗口函数计算 PySpark 中的累积和
Posted
技术标签:
【中文标题】使用窗口函数计算 PySpark 中的累积和【英文标题】:Calculating Cumulative sum in PySpark using Window Functions 【发布时间】:2017-10-27 16:31:04 【问题描述】:我有以下示例数据框:
rdd = sc.parallelize([(1,20), (2,30), (3,30)])
df2 = spark.createDataFrame(rdd, ["id", "duration"])
df2.show()
+---+--------+
| id|duration|
+---+--------+
| 1| 20|
| 2| 30|
| 3| 30|
+---+--------+
我想按持续时间的 desc 顺序对这个 DataFrame 进行排序,并添加一个包含持续时间累积总和的新列。所以我做了以下事情:
windowSpec = Window.orderBy(df2['duration'].desc())
df_cum_sum = df2.withColumn("duration_cum_sum", sum('duration').over(windowSpec))
df_cum_sum.show()
+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
| 2| 30| 60|
| 3| 30| 60|
| 1| 20| 80|
+---+--------+----------------+
我想要的输出是:
+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
| 2| 30| 30|
| 3| 30| 60|
| 1| 20| 80|
+---+--------+----------------+
我如何得到这个?
这是细分:
+--------+----------------+
|duration|duration_cum_sum|
+--------+----------------+
| 30| 30| #First value
| 30| 60| #Current duration + previous cum sum value
| 20| 80| #Current duration + previous cum sum value
+--------+----------------+
【问题讨论】:
【参考方案1】:您可以介绍row_number
来打破僵局;如果写在sql
:
df2.selectExpr(
"id", "duration",
"sum(duration) over (order by row_number() over (order by duration desc)) as duration_cum_sum"
).show()
+---+--------+----------------+
| id|duration|duration_cum_sum|
+---+--------+----------------+
| 2| 30| 30|
| 3| 30| 60|
| 1| 20| 80|
+---+--------+----------------+
【讨论】:
【参考方案2】:你可以在这里查看
df2.withColumn('cumu', F.sum('duration').over(Window.orderBy(F.col('duration').desc()).rowsBetween(Window.unboundedPreceding, 0)
)).show()
【讨论】:
以上是关于使用窗口函数计算 PySpark 中的累积和的主要内容,如果未能解决你的问题,请参考以下文章
开窗函数之累积和,PySpark,Pandas和SQL版实现
如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数