继承满足条件的最近行的值

Posted

技术标签:

【中文标题】继承满足条件的最近行的值【英文标题】:Carrying over values from most recent row satisfying a condition 【发布时间】:2020-12-03 03:28:53 【问题描述】:

我有一个 pyspark 数据框,它看起来像(一个更大的版本)如下:

+---+---+----+----+
| id|  t|type| val|
+---+---+----+----+
|100|  1|   1|  10|
|100|  2|   0|NULL|
|100|  5|   1|  20|
|100|  8|   0|NULL|
|100| 12|   0|NULL|
|100| 20|   0|NULL|
|100| 22|   1|  30|
|200|  5|   1|  40|
|200| 11|   0|NULL|
|200| 19|   1|  50|
|200| 24|   0|NULL|
|200| 25|   0|NULL|
+---+---+----+----+

我想创建一个新列,对于 1 类型的行,使用 val,对于 0 类型,使用来自 1 类型的最新条目的 val

输出如下所示:

+---+---+----+----+----+
| id|  t|type| val|val2|
+---+---+----+----+----+
|100|  1|   1|  10|  10|
|100|  2|   0|NULL|  10|
|100|  5|   1|  20|  20|
|100|  8|   0|NULL|  20|
|100| 12|   0|NULL|  20|
|100| 20|   0|NULL|  20|
|100| 22|   1|  30|  30|
|200|  5|   1|  40|  40|
|200| 11|   0|NULL|  40|
|200| 19|   1|  50|  50|
|200| 24|   0|NULL|  50|
|200| 25|   0|NULL|  50|
+---+---+----+----+----+

如果我们在 pandas 数据框中,如何通过迭代来完成这一点相当简单,但我无法找到使用 pyspark 工具的方法。我想做的是

from pyspark.sql import Window
import pyspark.sql.functions as sf

w = Window.partitionBy(['id']).orderBy('t')
df.withColumn('val2',
  sf.when(col('type'), col('val')).otherwise(sf.lag(col('val')).over(w))
)

但这会产生

+---+---+----+----+----+
| id|  t|type| val|val2|
+---+---+----+----+----+
|100|  1|   1|  10|  10|
|100|  2|   0|NULL|  10|
|100|  5|   1|  20|  20|
|100|  8|   0|NULL|  20|
|100| 12|   0|NULL|NULL|
|100| 20|   0|NULL|NULL|
|100| 22|   1|  30|  30|
|200|  5|   1|  40|  40|
|200| 11|   0|NULL|  40|
|200| 19|   1|  50|  50|
|200| 24|   0|NULL|  50|
|200| 25|   0|NULL|NULL|
+---+---+----+----+----+

我明白为什么这不起作用,但我不知道如何解决它。我想我可以使用 groupby('id').applyInPandas(...) 和一个遍历行的函数,但这会很慢,有没有更好的方法?

【问题讨论】:

我觉得你可以先用最后一个值 (SO post) 填充空值,然后用多个 when 语句 (link) 实现你的 val2 col 【参考方案1】:

是的,您可以将最后一个函数与 ignorenulls=True 一起使用:

from pyspark.sql import Window
import pyspark.sql.functions as sf

w = Window.partitionBy(['id']).orderBy('t')
df.withColumn('val2',
sf.when(col('type'), 
col('val')).otherwise(sf.last(col('val'),ignorenulls=True).over(w)))

【讨论】:

以上是关于继承满足条件的最近行的值的主要内容,如果未能解决你的问题,请参考以下文章

映射列满足特定条件的字典中的值

R语言dataframe计算满足筛选条件的行的个数(筛选满足条件的数据行并计数):类似于excel的countif函数

R - 选择满足多个条件的矩阵行的最快方法

满足条件时更改表格中整行的颜色?

在满足条件之前返回行值的排序行的火花聚合

MYSQL 存储过程 - 如果满足某些条件,则更新多个列