Pyspark 以递减的方式填充缺失值

Posted

技术标签:

【中文标题】Pyspark 以递减的方式填充缺失值【英文标题】:Pyspark Filling Missing Values as Decreasingly 【发布时间】:2020-11-18 13:21:06 【问题描述】:

我的 spark 数据框是;

Client  Date        Due_Day
A      2017-01-01   Null
A      2017-02-01   Null
A      2017-03-01   Null
A      2017-04-01   Null
A      2017-05-01   Null
A      2017-06-01   35
A      2017-07-01   Null
A      2017-08-01   Null
A      2017-09-01   Null
A      2017-10-01   Null
A      2017-11-01   Null
A      2017-12-01   Null
B      2017-01-01   Null
B      2017-02-01   Null
B      2017-03-01   Null
B      2017-04-01   Null
B      2017-05-01   Null
B      2017-06-01   Null
B      2017-07-01   Null
B      2017-08-01   Null
B      2017-09-01   Null
B      2017-10-01   78
B      2017-11-01   Null
B      2017-12-01   Null

数据帧中的同一个客户端有一个非空的 Due_Day。

期望的输出是;

Client  Date    Due_Day    Result
A   2017-01-01  Null       -115
A   2017-02-01  Null       -85
A   2017-03-01  Null       -55      -> -25 - 30 = -55
A   2017-04-01  Null       -25      ->  5 - 30 = -25
A   2017-05-01  Null       5        ->  35 - 30 = 5
A   2017-06-01  35         35
A   2017-07-01  Null       Null     -> Still Same value (null)      
A   2017-08-01  Null       Null     -> Still Same value (null)      
A   2017-09-01  Null       Null
A   2017-10-01  Null       Null
A   2017-11-01  Null       Null
A   2017-12-01  Null       Null
B   2017-01-01  Null       -192
B   2017-02-01  Null       -162
B   2017-03-01  Null       -132
B   2017-04-01  Null       -102
B   2017-05-01  Null       -72
B   2017-06-01  Null       -42
B   2017-07-01  Null       -12       
B   2017-08-01  Null       18        -> 48 - 30 = 18
B   2017-09-01  Null       48        -> 78 - 30 = 48
B   2017-10-01  78         78
B   2017-11-01  Null       Null      -> Still Same value (null)    
B   2017-12-01  Null       Null      -> Still Same value (null) 

在每个客户的年初之前,Result 列中的值应在非空 Due_Day 值之前每月减少 30 天。

您能帮我了解一下 pyspark 代码吗?

【问题讨论】:

【参考方案1】:

这可以通过识别last非空Due_Day和对应的row number来解决。要计算结果,请从最后一个非空 Due_Day 中减去 30,乘以当前行与包含最后一个非空 Due_Day 的行之间的行数。

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col as c

data = [("A", "2017-01-01", None,),
        ("A", "2017-02-01", None,),
        ("A", "2017-03-01", None,),
        ("A", "2017-04-01", None,),
        ("A", "2017-05-01", None,),
        ("A", "2017-06-01", 35,),
        ("A", "2017-07-01", None,),
        ("A", "2017-08-01", None,),
        ("A", "2017-09-01", None,),
        ("A", "2017-10-01", None,),
        ("A", "2017-11-01", None,),
        ("A", "2017-12-01", None,),
        ("B", "2017-01-01", None,),
        ("B", "2017-02-01", None,),
        ("B", "2017-03-01", None,),
        ("B", "2017-04-01", None,),
        ("B", "2017-05-01", None,),
        ("B", "2017-06-01", None,),
        ("B", "2017-07-01", None,),
        ("B", "2017-08-01", None,),
        ("B", "2017-09-01", None,),
        ("B", "2017-10-01", 78,),
        ("B", "2017-11-01", None,),
        ("B", "2017-12-01", None,), ]

df = spark.createDataFrame(data, ("Client", "Date", "Due_Day",)).withColumn("Date", F.to_date(F.col("Date"), "yyyy-MM-dd"))

window_spec = W.partitionBy("Client").orderBy("Date")

df.withColumn("rn", F.row_number().over(window_spec))\
  .withColumn("nonNullRn", F.when(c("Due_Day").isNull(), F.lit(None)).otherwise(c("rn")))\
  .withColumn("nonNullDue_Day", F.last("Due_Day", ignorenulls=True).over(window_spec.rowsBetween(W.currentRow, W.unboundedFollowing)))\
  .withColumn("nonNullRn", F.last("nonNullRn", ignorenulls=True).over(window_spec.rowsBetween(W.currentRow, W.unboundedFollowing)))\
  .withColumn("Result", c("nonNullDue_Day") - (F.lit(30) * (c("nonNullRn") - c("rn"))))\
  .select("Client", "Date", "Due_Day", "Result")\
  .show(200)

输出

+------+----------+-------+------+
|Client|      Date|Due_Day|Result|
+------+----------+-------+------+
|     A|2017-01-01|   null|  -115|
|     A|2017-02-01|   null|   -85|
|     A|2017-03-01|   null|   -55|
|     A|2017-04-01|   null|   -25|
|     A|2017-05-01|   null|     5|
|     A|2017-06-01|     35|    35|
|     A|2017-07-01|   null|  null|
|     A|2017-08-01|   null|  null|
|     A|2017-09-01|   null|  null|
|     A|2017-10-01|   null|  null|
|     A|2017-11-01|   null|  null|
|     A|2017-12-01|   null|  null|
|     B|2017-01-01|   null|  -192|
|     B|2017-02-01|   null|  -162|
|     B|2017-03-01|   null|  -132|
|     B|2017-04-01|   null|  -102|
|     B|2017-05-01|   null|   -72|
|     B|2017-06-01|   null|   -42|
|     B|2017-07-01|   null|   -12|
|     B|2017-08-01|   null|    18|
|     B|2017-09-01|   null|    48|
|     B|2017-10-01|     78|    78|
|     B|2017-11-01|   null|  null|
|     B|2017-12-01|   null|  null|
+------+----------+-------+------+

【讨论】:

以上是关于Pyspark 以递减的方式填充缺失值的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中用平均值填充缺失值

我想用 Pyspark 中的最后一行值填充缺失值:

在pyspark中填充每组的缺失值?

Pyspark - 每个键添加缺失值?

数据预处理----缺失值的填充

机器学习数据清洗之缺失值处理缺失的原因缺失值删除缺失值填充KNN填充