Pyspark 以递减的方式填充缺失值
Posted
技术标签:
【中文标题】Pyspark 以递减的方式填充缺失值【英文标题】:Pyspark Filling Missing Values as Decreasingly 【发布时间】:2020-11-18 13:21:06 【问题描述】:我的 spark 数据框是;
Client Date Due_Day
A 2017-01-01 Null
A 2017-02-01 Null
A 2017-03-01 Null
A 2017-04-01 Null
A 2017-05-01 Null
A 2017-06-01 35
A 2017-07-01 Null
A 2017-08-01 Null
A 2017-09-01 Null
A 2017-10-01 Null
A 2017-11-01 Null
A 2017-12-01 Null
B 2017-01-01 Null
B 2017-02-01 Null
B 2017-03-01 Null
B 2017-04-01 Null
B 2017-05-01 Null
B 2017-06-01 Null
B 2017-07-01 Null
B 2017-08-01 Null
B 2017-09-01 Null
B 2017-10-01 78
B 2017-11-01 Null
B 2017-12-01 Null
数据帧中的同一个客户端有一个非空的 Due_Day。
期望的输出是;
Client Date Due_Day Result
A 2017-01-01 Null -115
A 2017-02-01 Null -85
A 2017-03-01 Null -55 -> -25 - 30 = -55
A 2017-04-01 Null -25 -> 5 - 30 = -25
A 2017-05-01 Null 5 -> 35 - 30 = 5
A 2017-06-01 35 35
A 2017-07-01 Null Null -> Still Same value (null)
A 2017-08-01 Null Null -> Still Same value (null)
A 2017-09-01 Null Null
A 2017-10-01 Null Null
A 2017-11-01 Null Null
A 2017-12-01 Null Null
B 2017-01-01 Null -192
B 2017-02-01 Null -162
B 2017-03-01 Null -132
B 2017-04-01 Null -102
B 2017-05-01 Null -72
B 2017-06-01 Null -42
B 2017-07-01 Null -12
B 2017-08-01 Null 18 -> 48 - 30 = 18
B 2017-09-01 Null 48 -> 78 - 30 = 48
B 2017-10-01 78 78
B 2017-11-01 Null Null -> Still Same value (null)
B 2017-12-01 Null Null -> Still Same value (null)
在每个客户的年初之前,Result 列中的值应在非空 Due_Day 值之前每月减少 30 天。
您能帮我了解一下 pyspark 代码吗?
【问题讨论】:
【参考方案1】:这可以通过识别last
非空Due_Day
和对应的row number
来解决。要计算结果,请从最后一个非空 Due_Day 中减去 30,乘以当前行与包含最后一个非空 Due_Day 的行之间的行数。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col as c
data = [("A", "2017-01-01", None,),
("A", "2017-02-01", None,),
("A", "2017-03-01", None,),
("A", "2017-04-01", None,),
("A", "2017-05-01", None,),
("A", "2017-06-01", 35,),
("A", "2017-07-01", None,),
("A", "2017-08-01", None,),
("A", "2017-09-01", None,),
("A", "2017-10-01", None,),
("A", "2017-11-01", None,),
("A", "2017-12-01", None,),
("B", "2017-01-01", None,),
("B", "2017-02-01", None,),
("B", "2017-03-01", None,),
("B", "2017-04-01", None,),
("B", "2017-05-01", None,),
("B", "2017-06-01", None,),
("B", "2017-07-01", None,),
("B", "2017-08-01", None,),
("B", "2017-09-01", None,),
("B", "2017-10-01", 78,),
("B", "2017-11-01", None,),
("B", "2017-12-01", None,), ]
df = spark.createDataFrame(data, ("Client", "Date", "Due_Day",)).withColumn("Date", F.to_date(F.col("Date"), "yyyy-MM-dd"))
window_spec = W.partitionBy("Client").orderBy("Date")
df.withColumn("rn", F.row_number().over(window_spec))\
.withColumn("nonNullRn", F.when(c("Due_Day").isNull(), F.lit(None)).otherwise(c("rn")))\
.withColumn("nonNullDue_Day", F.last("Due_Day", ignorenulls=True).over(window_spec.rowsBetween(W.currentRow, W.unboundedFollowing)))\
.withColumn("nonNullRn", F.last("nonNullRn", ignorenulls=True).over(window_spec.rowsBetween(W.currentRow, W.unboundedFollowing)))\
.withColumn("Result", c("nonNullDue_Day") - (F.lit(30) * (c("nonNullRn") - c("rn"))))\
.select("Client", "Date", "Due_Day", "Result")\
.show(200)
输出
+------+----------+-------+------+
|Client| Date|Due_Day|Result|
+------+----------+-------+------+
| A|2017-01-01| null| -115|
| A|2017-02-01| null| -85|
| A|2017-03-01| null| -55|
| A|2017-04-01| null| -25|
| A|2017-05-01| null| 5|
| A|2017-06-01| 35| 35|
| A|2017-07-01| null| null|
| A|2017-08-01| null| null|
| A|2017-09-01| null| null|
| A|2017-10-01| null| null|
| A|2017-11-01| null| null|
| A|2017-12-01| null| null|
| B|2017-01-01| null| -192|
| B|2017-02-01| null| -162|
| B|2017-03-01| null| -132|
| B|2017-04-01| null| -102|
| B|2017-05-01| null| -72|
| B|2017-06-01| null| -42|
| B|2017-07-01| null| -12|
| B|2017-08-01| null| 18|
| B|2017-09-01| null| 48|
| B|2017-10-01| 78| 78|
| B|2017-11-01| null| null|
| B|2017-12-01| null| null|
+------+----------+-------+------+
【讨论】:
以上是关于Pyspark 以递减的方式填充缺失值的主要内容,如果未能解决你的问题,请参考以下文章