Pyspark 有条件的累积和

Posted

技术标签:

【中文标题】Pyspark 有条件的累积和【英文标题】:Pyspark Cummulative sum with conditions 【发布时间】:2018-07-11 18:31:38 【问题描述】:

我有一个包含 3 列的 pyspark 数据框:

ID,每个出现多次; 日期DELAY,如果该账单按时支付,则为 0,否则为 1。

它已按 IDDATE 排序。 我需要创建一个名为 CONSECUTIVE 的列,用于显示每个 ID 以 DELAY=1 连续支付了多少连续账单。

数据示例和预期结果:

ID    | DATE  | DELAY  | CONSECUTIVE
101   | 1     | 1      | 1
101   | 2     | 1      | 2
101   | 3     | 1      | 3
101   | 4     | 0      | 0
101   | 5     | 1      | 1
101   | 6     | 1      | 2
213   | 1     | 1      | 1
213   | 2     | 1      | 2

有没有办法在不使用 Pandas 的情况下做到这一点?如果是这样,我该怎么做?

【问题讨论】:

Python Spark Cumulative Sum by Group Using DataFrame的可能重复 查看欺骗目标-您正在寻找类似df.withColumn('CONSECUTIVE', F.sum('DELAY').over(Window.partitionBy('ID').orderBy('DATE').rangeBetween(Window.unboundedPreceding, 0)) 【参考方案1】:

您可以在窗口的帮助下进行 3 次转换。

from pyspark.sql.window import Window
from pyspark.sql import functions as F

df = sqlContext.createDataFrame([
    (101, 1, 1),
    (101, 2, 1), # dasd
    (101, 3, 0),
    (101, 4, 1)
], ["id", 'date', 'delay'])

window = Window.partitionBy('id').orderBy('date')
last_value = F.last('rank').over(window.rowsBetween(-2, -1))
consecutive = F.when( F.col('delay')==0, 0) \
            .otherwise( F.when(F.col('last_rank').isNull(), 1) \
            .otherwise( F.col('last_rank')+1))

df \
    .withColumn('rank', F.row_number().over(window)) \
    .withColumn('rank', F.when(F.col('delay')!=0, F.col('rank')).otherwise(0)) \
    .withColumn('last_rank', last_value) \
    .withColumn('consecutive', consecutive).show()

结果:

+---+----+-----+----+---------+-----------+
| id|date|delay|rank|last_rank|consecutive|
+---+----+-----+----+---------+-----------+
|101|   1|    1|   1|     null|          1|
|101|   1|    1|   2|        1|          2|
|101|   1|    0|   0|        2|          0|
|101|   1|    1|   4|        0|          1|
+---+----+-----+----+---------+-----------+

【讨论】:

以上是关于Pyspark 有条件的累积和的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 获取具有条件的列的累积总和

pyspark中基于条件对多列进行分组的累积和函数

如何计算pyspark表中的累积和

使用窗口函数计算 PySpark 中的累积和

pyspark中的累积和

在 pyspark 中对大量列进行累积求和的优化方法