在pyspark中填充每组的缺失值?

Posted

技术标签:

【中文标题】在pyspark中填充每组的缺失值?【英文标题】:Filling missing values per group in pyspark? 【发布时间】:2020-08-30 13:57:24 【问题描述】:

我有一个如下所示的 DataFrame:

| date      | week | quantity |
|-----------|------|----------|
| 1/1/2020  | 1    | 17       |
| 1/2/2020  | 1    | 15       |
| 1/3/2020  | 1    | 9        |
| 1/4/2020  | 1    |          |
| 1/5/2020  | 1    |          |
| 1/6/2020  | 1    | 3        |
| 1/7/2020  | 1    | 3        |
| 1/8/2020  | 2    | 10       |
| 1/9/2020  | 2    | 8        |
| 1/10/2020 | 2    | 8        |
| 1/11/2020 | 2    |          |
| 1/12/2020 | 2    |          |
| 1/13/2020 | 2    |          |
| 1/14/2020 | 2    | 8        |

我需要通过在缺失日期之间分配最后一个已知值来估算缺失值。因此,对于第 1 周,我的输出应如下所示:

| date     | week | quantity | quant_dist |
|----------|------|----------|------------|
| 1/1/2020 | 1    | 17       | 17         |
| 1/2/2020 | 1    | 15       | 15         |
| 1/3/2020 | 1    | 9        | 9          |
| 1/4/2020 | 1    |          | 1          |
| 1/5/2020 | 1    |          | 1          |
| 1/6/2020 | 1    | 3        | 1          |
| 1/7/2020 | 1    | 3        | 3          |

第 2 周应该是这样的:

| date      | week | quantity | quant_dist |
|-----------|------|----------|------------|
| 1/8/2020  | 2    | 10       | 10         |
| 1/9/2020  | 2    | 8        | 8          |
| 1/10/2020 | 2    | 8        | 8          |
| 1/11/2020 | 2    |          | 2          |
| 1/12/2020 | 2    |          | 2          |
| 1/13/2020 | 2    |          | 2          |
| 1/14/2020 | 2    | 8        | 2          |

我已经坚持了 2 多天了。有什么建议吗?

【问题讨论】:

这能回答你的问题吗? Forward fill missing values in Spark/Python 不完全向前填充.. 【参考方案1】:

有很多功能和Windows,我可以实现它。

from pyspark.sql.functions import *
from pyspark.sql import Window

w1 = Window.orderBy('date').rowsBetween(Window.currentRow, Window.unboundedFollowing)
w2 = Window.partitionBy('group')

df.withColumn('date', to_date('date', 'MM/dd/yyyy')) \
  .withColumn('group', sum(when(col('quantity').isNull(), 0).otherwise(1)).over(w1)) \
  .withColumn('count', count(when(col('quantity').isNull(), 1).otherwise(0)).over(w2)) \
  .withColumn('quantity', first('quantity', ignorenulls=True).over(w1) / col('count')).show()

+----------+----+--------+-----+-----+
|      date|week|quantity|group|count|
+----------+----+--------+-----+-----+
|2020-01-01|   1|    17.0|    9|    1|
|2020-01-02|   1|    15.0|    8|    1|
|2020-01-03|   1|     9.0|    7|    1|
|2020-01-04|   1|     1.0|    6|    3|
|2020-01-05|   1|     1.0|    6|    3|
|2020-01-06|   1|     1.0|    6|    3|
|2020-01-07|   1|     3.0|    5|    1|
|2020-01-08|   2|    10.0|    4|    1|
|2020-01-09|   2|     8.0|    3|    1|
|2020-01-10|   2|     8.0|    2|    1|
|2020-01-11|   2|     2.0|    1|    4|
|2020-01-12|   2|     2.0|    1|    4|
|2020-01-13|   2|     2.0|    1|    4|
|2020-01-14|   2|     2.0|    1|    4|
+----------+----+--------+-----+-----+

【讨论】:

非常感谢......我没有太多声望来支持这个答案 但是你可以检查这个作为答案,也许:)没问题。

以上是关于在pyspark中填充每组的缺失值?的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中用平均值填充缺失值

Pyspark 用递减填充缺失值

我想用 Pyspark 中的最后一行值填充缺失值:

Pyspark 以递减的方式填充缺失值

熊猫:在每组中按平均值填充缺失值

熊猫:在每组中按平均值填充缺失值