Pyspark 用递减填充缺失值

Posted

技术标签:

【中文标题】Pyspark 用递减填充缺失值【英文标题】:Pyspark Fill Missing Values with Decreasing 【发布时间】:2020-12-30 19:58:23 【问题描述】:

我的输入火花数据框是;

Date        Client  Values
2020-10-26  1       NULL
2020-10-27  1       NULL
2020-10-28  1       NULL
2020-10-29  1       6
2020-10-30  1       NULL
2020-10-31  1       NULL
2020-11-01  1       NULL
2020-11-02  1       NULL
2020-11-03  1       NULL
2020-11-04  1       NULL
2020-11-05  1       NULL
2020-11-06  1       NULL
2020-11-07  1       5
2020-11-08  1       9
2020-11-09  1       NULL
2020-10-26  2       NULL
2020-10-27  2       NULL
2020-10-28  2       NULL
2020-10-29  2       10
2020-10-30  2       6
2020-10-31  2       NULL
2020-11-01  2       NULL
2020-11-02  2       NULL
2020-11-03  2       NULL
2020-11-04  2       NULL
2020-11-05  2       3
2020-11-06  2       NULL
2020-11-07  2       NULL
2020-11-08  2       10
2020-11-09  2       1

我想在每个客户端的非空值减 1 之前填充空值。这个填充过程应该一直持续到值达到 1 或遇到另一个非空值。 我根据上面的数据分享了想要的输出;

Date        Client  Values
2020-10-26  1       3   .
2020-10-27  1       4   .
2020-10-28  1       5   .
2020-10-29  1       6   -> First non null value after null values. Fill the previous rows in decreasing 1 (5, 4, 3).
2020-10-30  1       NULL
2020-10-31  1       NULL
2020-11-01  1       NULL
2020-11-02  1       NULL
2020-11-03  1       1  .
2020-11-04  1       2  .
2020-11-05  1       3  .
2020-11-06  1       4  .
2020-11-07  1       5  -> First non null value after null values. Fill the previous rows in decreasing 1 (4, 3, 2, 1).
2020-11-08  1       9
2020-11-09  1       NULL
2020-10-26  2       7   .
2020-10-27  2       8   .
2020-10-28  2       9   .
2020-10-29  2       10  -> First non null value after null values. Fill the previous rows in decreasing 1 (9, 8, 7).
2020-10-30  2       6
2020-10-31  2       NULL
2020-11-01  2       NULL
2020-11-02  2       NULL
2020-11-03  2       1
2020-11-04  2       2
2020-11-05  2       3
2020-11-06  2       8
2020-11-07  2       9
2020-11-08  2       10
2020-11-09  2       1

你能帮我解决这个问题吗?

【问题讨论】:

【参考方案1】:
from pyspark.sql import functions as F, Window

w = Window.partitionBy('Client').orderBy('Date')

result = df.withColumn(
    'rn',
    F.row_number().over(w)
).withColumn(
    'rn2',    # get next row number corresponding to non-null Values
    F.first(F.when(F.col('Values').isNotNull(), F.col('rn')), ignorenulls=True)
     .over(w.rowsBetween(0, Window.unboundedFollowing))
).withColumn(
    'value2',    # get the number you want
    F.first('Values', ignorenulls=True)
     .over(w.rowsBetween(0, Window.unboundedFollowing))
    + F.col('rn') - F.col('rn2')
).withColumn(
    'Values',
    F.when(F.col('value2') > 0, F.col('value2'))   # only keep positive values
).select(df.columns)

result.show(99,0)
+----------+------+------+
|Date      |Client|Values|
+----------+------+------+
|2020-10-26|1     |3     |
|2020-10-27|1     |4     |
|2020-10-28|1     |5     |
|2020-10-29|1     |6     |
|2020-10-30|1     |null  |
|2020-10-31|1     |null  |
|2020-11-01|1     |null  |
|2020-11-02|1     |null  |
|2020-11-03|1     |1     |
|2020-11-04|1     |2     |
|2020-11-05|1     |3     |
|2020-11-06|1     |4     |
|2020-11-07|1     |5     |
|2020-11-08|1     |9     |
|2020-11-09|1     |null  |
|2020-10-26|2     |7     |
|2020-10-27|2     |8     |
|2020-10-28|2     |9     |
|2020-10-29|2     |10    |
|2020-10-30|2     |6     |
|2020-10-31|2     |null  |
|2020-11-01|2     |null  |
|2020-11-02|2     |null  |
|2020-11-03|2     |1     |
|2020-11-04|2     |2     |
|2020-11-05|2     |3     |
|2020-11-06|2     |8     |
|2020-11-07|2     |9     |
|2020-11-08|2     |10    |
|2020-11-09|2     |1     |
+----------+------+------+

幕后:

+----------+------+------+---+----+------+
|Date      |Client|Values|rn |rn2 |value2|
+----------+------+------+---+----+------+
|2020-10-26|1     |3     |1  |4   |3     |
|2020-10-27|1     |4     |2  |4   |4     |
|2020-10-28|1     |5     |3  |4   |5     |
|2020-10-29|1     |6     |4  |4   |6     |
|2020-10-30|1     |null  |5  |13  |-3    |
|2020-10-31|1     |null  |6  |13  |-2    |
|2020-11-01|1     |null  |7  |13  |-1    |
|2020-11-02|1     |null  |8  |13  |0     |
|2020-11-03|1     |1     |9  |13  |1     |
|2020-11-04|1     |2     |10 |13  |2     |
|2020-11-05|1     |3     |11 |13  |3     |
|2020-11-06|1     |4     |12 |13  |4     |
|2020-11-07|1     |5     |13 |13  |5     |
|2020-11-08|1     |9     |14 |14  |9     |
|2020-11-09|1     |null  |15 |null|null  |
|2020-10-26|2     |7     |1  |4   |7     |
|2020-10-27|2     |8     |2  |4   |8     |
|2020-10-28|2     |9     |3  |4   |9     |
|2020-10-29|2     |10    |4  |4   |10    |
|2020-10-30|2     |6     |5  |5   |6     |
|2020-10-31|2     |null  |6  |11  |-2    |
|2020-11-01|2     |null  |7  |11  |-1    |
|2020-11-02|2     |null  |8  |11  |0     |
|2020-11-03|2     |1     |9  |11  |1     |
|2020-11-04|2     |2     |10 |11  |2     |
|2020-11-05|2     |3     |11 |11  |3     |
|2020-11-06|2     |8     |12 |14  |8     |
|2020-11-07|2     |9     |13 |14  |9     |
|2020-11-08|2     |10    |14 |14  |10    |
|2020-11-09|2     |1     |15 |15  |1     |
+----------+------+------+---+----+------+

【讨论】:

以上是关于Pyspark 用递减填充缺失值的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中用平均值填充缺失值

我想用 Pyspark 中的最后一行值填充缺失值:

在pyspark中填充每组的缺失值?

Pyspark - 每个键添加缺失值?

Pyspark Dataframe Imputations - 根据指定条件用列平均值替换未知和缺失值

python 特征缺失值填充