Pyspark - 具有重置条件的累积和
Posted
技术标签:
【中文标题】Pyspark - 具有重置条件的累积和【英文标题】:Pyspark - Cumulative sum with reset condition 【发布时间】:2019-10-16 11:39:55 【问题描述】:我有这个数据框
+---+----+---+
| A| B| C|
+---+----+---+
| 0|null| 1|
| 1| 3.0| 0|
| 2| 7.0| 0|
| 3|null| 1|
| 4| 4.0| 0|
| 5| 3.0| 0|
| 6|null| 1|
| 7|null| 1|
| 8|null| 1|
| 9| 5.0| 0|
| 10| 2.0| 0|
| 11|null| 1|
+---+----+---+
我需要做的是 C 列中值的累积总和,直到下一个值为零。
预期输出:
+---+----+---+----+
| A| B| C| D|
+---+----+---+----+
| 0|null| 1| 1|
| 1| 3.0| 0| 0|
| 2| 7.0| 0| 0|
| 3|null| 1| 1|
| 4| 4.0| 0| 0|
| 5| 3.0| 0| 0|
| 6|null| 1| 1|
| 7|null| 1| 2|
| 8|null| 1| 3|
| 9| 5.0| 0| 0|
| 10| 2.0| 0| 0|
| 11|null| 1| 1|
+---+----+---+----+
重现数据框:
from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum
x = sc.parallelize([
[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
[5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])
# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1).otherwise(0))
【问题讨论】:
【参考方案1】:创建一个临时列 (grp
),每次列 C
等于 0
(重置条件)时递增一个计数器,并将其用作累积总和的分区列。
import pyspark.sql.functions as f
from pyspark.sql import Window
x.withColumn(
"grp",
f.sum((f.col("C") == 0).cast("int")).over(Window.orderBy("A"))
).withColumn(
"D",
f.sum(f.col("C")).over(Window.partitionBy("grp").orderBy("A"))
).drop("grp").show()
#+---+----+---+---+
#| A| B| C| D|
#+---+----+---+---+
#| 0|null| 1| 1|
#| 1| 3.0| 0| 0|
#| 2| 7.0| 0| 0|
#| 3|null| 1| 1|
#| 4| 4.0| 0| 0|
#| 5| 3.0| 0| 0|
#| 6|null| 1| 1|
#| 7|null| 1| 2|
#| 8|null| 1| 3|
#| 9| 5.0| 0| 0|
#| 10| 2.0| 0| 0|
#| 11|null| 1| 1|
#+---+----+---+---+
【讨论】:
你能不能请'grp'部分。这很有趣,但我无法理解它是如何工作的。 哇:(f.col("C") == 0).cast("int")
- 创建一个布尔值,然后将其转换为 1
,以便可以将其汇总到一个分区中。这对于某种性能实际上是必要的,还是只是“聪明”?
@stephen 我不记得说实话,但我认为这是必要的,因为sum
需要数字类型并且不会进行隐式转换。也许最新版本的 spark 处理它的方式不同。如果您尝试一下,请随时对此答案进行澄清更新。
@pault 最佳答案。救了我的命 +1以上是关于Pyspark - 具有重置条件的累积和的主要内容,如果未能解决你的问题,请参考以下文章