Pyspark 有条件的累积和
Posted
技术标签:
【中文标题】Pyspark 有条件的累积和【英文标题】:Pyspark Cummulative sum with conditions 【发布时间】:2018-07-11 18:31:38 【问题描述】:我有一个包含 3 列的 pyspark 数据框:
ID,每个出现多次; 日期; DELAY,如果该账单按时支付,则为 0,否则为 1。
它已按 ID 和 DATE 排序。 我需要创建一个名为 CONSECUTIVE 的列,用于显示每个 ID 以 DELAY=1 连续支付了多少连续账单。
数据示例和预期结果:
ID | DATE | DELAY | CONSECUTIVE
101 | 1 | 1 | 1
101 | 2 | 1 | 2
101 | 3 | 1 | 3
101 | 4 | 0 | 0
101 | 5 | 1 | 1
101 | 6 | 1 | 2
213 | 1 | 1 | 1
213 | 2 | 1 | 2
有没有办法在不使用 Pandas 的情况下做到这一点?如果是这样,我该怎么做?
【问题讨论】:
Python Spark Cumulative Sum by Group Using DataFrame的可能重复 查看欺骗目标-您正在寻找类似df.withColumn('CONSECUTIVE', F.sum('DELAY').over(Window.partitionBy('ID').orderBy('DATE').rangeBetween(Window.unboundedPreceding, 0))
【参考方案1】:
您可以在窗口的帮助下进行 3 次转换。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
df = sqlContext.createDataFrame([
(101, 1, 1),
(101, 2, 1), # dasd
(101, 3, 0),
(101, 4, 1)
], ["id", 'date', 'delay'])
window = Window.partitionBy('id').orderBy('date')
last_value = F.last('rank').over(window.rowsBetween(-2, -1))
consecutive = F.when( F.col('delay')==0, 0) \
.otherwise( F.when(F.col('last_rank').isNull(), 1) \
.otherwise( F.col('last_rank')+1))
df \
.withColumn('rank', F.row_number().over(window)) \
.withColumn('rank', F.when(F.col('delay')!=0, F.col('rank')).otherwise(0)) \
.withColumn('last_rank', last_value) \
.withColumn('consecutive', consecutive).show()
结果:
+---+----+-----+----+---------+-----------+
| id|date|delay|rank|last_rank|consecutive|
+---+----+-----+----+---------+-----------+
|101| 1| 1| 1| null| 1|
|101| 1| 1| 2| 1| 2|
|101| 1| 0| 0| 2| 0|
|101| 1| 1| 4| 0| 1|
+---+----+-----+----+---------+-----------+
【讨论】:
以上是关于Pyspark 有条件的累积和的主要内容,如果未能解决你的问题,请参考以下文章