我想用 Pyspark 中的最后一行值填充缺失值:

Posted

技术标签:

【中文标题】我想用 Pyspark 中的最后一行值填充缺失值:【英文标题】:I want to fill Missing value with last row value in Pyspark: 【发布时间】:2018-10-11 12:06:36 【问题描述】:

我的df 有多个列

我试过的查询:

df=df.withColumn('Column_required',F.when(df.Column_present>1,df.Column_present).otherwise(lag(df.Column_present))

否则无法工作。 .我要操作的列:

Column_present       Column_required
40000                 40000
Null                  40000
Null                  40000
500                   500
Null                  500
Null                  500

【问题讨论】:

请尝试通过使用格式选项使您的问题更具可读性,很难理解您到底想要达到什么目的......编辑:我看到您在输入我的评论时这样做了:) 这些可能对(1) fill with last observation和forward fill missing values有帮助 【参考方案1】:

我认为您的解决方案可能是使用last 而不是滞后:

df_new = spark.createDataFrame([
(1, 40000), (2, None),  (3,None), (4,None),
(5,500), (6,None), (7,None)
], ("id", "Col_present"))

df_new.withColumn('Column_required',when(df_new.Col_present>1,df_new.Col_present).otherwise(last(df_new.Col_present,ignorenulls=True).over(Window.orderBy("id")))).show()

这将产生您想要的输出:

+---+-----------+---------------+
| id|Col_present|Column_required|
+---+-----------+---------------+
|  1|      40000|          40000|
|  2|       null|          40000|
|  3|       null|          40000|
|  4|       null|          40000|
|  5|        500|            500|
|  6|       null|            500|
|  7|       null|            500|
+---+-----------+---------------+

但请注意,window 函数需要一个列来执行排序。这就是我在示例中使用id 列的原因。如果您的数据框不包含带有monotonically_increasing_id() 的可排序列,您可以自己创建一个 id 列。

【讨论】:

谢谢它的工作!你能解释一下 .over(Window) 的作用吗? 滞后函数需要一个值列表来选择前一个值。 window 可用于创建此列表,在这种情况下,它只需要数据执行排序并生成此集合。所以通常你不应该在没有window 的情况下应用lag 函数,因为它不知道从哪里获取以前的值....我希望它现在变得更清楚了 你好,gaw,你的解决方案在 jupyter 上运行良好,但是当我在终端上批量提交它时,它给了我错误(ignorenulls 是一个意外的关键字),你能告诉我我怎么能运行它批量提交?感谢您的帮助 如果你想批量运行它,你必须确保你导入 sql.functions 以便它知道函数。如果您随后使用与 jupyter 中相同的 spark 版本,它应该可以正常工作。不幸的是,该解决方案依赖于 ignorenulls,因为您只想考虑数值。您也可以尝试在不指定关键字的情况下运行代码,并将True 作为第二个参数传递:...last(df_new.Col_present,True).over...

以上是关于我想用 Pyspark 中的最后一行值填充缺失值:的主要内容,如果未能解决你的问题,请参考以下文章

R数据框 - 用另一列的条件填充缺失值

Pyspark 用递减填充缺失值

在pyspark中用平均值填充缺失值

Pyspark 以递减的方式填充缺失值

在pyspark中填充每组的缺失值?

填写缺失的日期值并根据前一行填充第二列