继承满足条件的最近行的值
Posted
技术标签:
【中文标题】继承满足条件的最近行的值【英文标题】:Carrying over values from most recent row satisfying a condition 【发布时间】:2020-12-03 03:28:53 【问题描述】:我有一个 pyspark 数据框,它看起来像(一个更大的版本)如下:
+---+---+----+----+
| id| t|type| val|
+---+---+----+----+
|100| 1| 1| 10|
|100| 2| 0|NULL|
|100| 5| 1| 20|
|100| 8| 0|NULL|
|100| 12| 0|NULL|
|100| 20| 0|NULL|
|100| 22| 1| 30|
|200| 5| 1| 40|
|200| 11| 0|NULL|
|200| 19| 1| 50|
|200| 24| 0|NULL|
|200| 25| 0|NULL|
+---+---+----+----+
我想创建一个新列,对于 1
类型的行,使用 val
,对于 0
类型,使用来自 1
类型的最新条目的 val
。
输出如下所示:
+---+---+----+----+----+
| id| t|type| val|val2|
+---+---+----+----+----+
|100| 1| 1| 10| 10|
|100| 2| 0|NULL| 10|
|100| 5| 1| 20| 20|
|100| 8| 0|NULL| 20|
|100| 12| 0|NULL| 20|
|100| 20| 0|NULL| 20|
|100| 22| 1| 30| 30|
|200| 5| 1| 40| 40|
|200| 11| 0|NULL| 40|
|200| 19| 1| 50| 50|
|200| 24| 0|NULL| 50|
|200| 25| 0|NULL| 50|
+---+---+----+----+----+
如果我们在 pandas 数据框中,如何通过迭代来完成这一点相当简单,但我无法找到使用 pyspark 工具的方法。我想做的是
from pyspark.sql import Window
import pyspark.sql.functions as sf
w = Window.partitionBy(['id']).orderBy('t')
df.withColumn('val2',
sf.when(col('type'), col('val')).otherwise(sf.lag(col('val')).over(w))
)
但这会产生
+---+---+----+----+----+
| id| t|type| val|val2|
+---+---+----+----+----+
|100| 1| 1| 10| 10|
|100| 2| 0|NULL| 10|
|100| 5| 1| 20| 20|
|100| 8| 0|NULL| 20|
|100| 12| 0|NULL|NULL|
|100| 20| 0|NULL|NULL|
|100| 22| 1| 30| 30|
|200| 5| 1| 40| 40|
|200| 11| 0|NULL| 40|
|200| 19| 1| 50| 50|
|200| 24| 0|NULL| 50|
|200| 25| 0|NULL|NULL|
+---+---+----+----+----+
我明白为什么这不起作用,但我不知道如何解决它。我想我可以使用 groupby('id').applyInPandas(...)
和一个遍历行的函数,但这会很慢,有没有更好的方法?
【问题讨论】:
我觉得你可以先用最后一个值 (SO post) 填充空值,然后用多个when
语句 (link) 实现你的 val2
col
【参考方案1】:
是的,您可以将最后一个函数与 ignorenulls=True 一起使用:
from pyspark.sql import Window
import pyspark.sql.functions as sf
w = Window.partitionBy(['id']).orderBy('t')
df.withColumn('val2',
sf.when(col('type'),
col('val')).otherwise(sf.last(col('val'),ignorenulls=True).over(w)))
【讨论】:
以上是关于继承满足条件的最近行的值的主要内容,如果未能解决你的问题,请参考以下文章