如何在 PySpark 中的每个分区中回填空值

Posted

技术标签:

【中文标题】如何在 PySpark 中的每个分区中回填空值【英文标题】:How to backfill null values in each partition in PySpark 【发布时间】:2020-01-10 13:47:07 【问题描述】:

我在 PySpark 中有以下 DataFrame:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     null                    null                    1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     null                    null                    101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

我需要替换 null 值以获得以下结果:

Id      DateActual          DateStart               DateEnd                 SourceCode
107 2019-08-11 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
107 2019-08-16 00:00:00     2019-08-11 00:00:00     2019-08-18 00:00:00     1111
128 2019-02-11 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     101
128 2019-02-13 00:00:00     2019-02-11 00:00:00     2019-02-18 00:00:00     168
128 2019-02-14 00:00:00     2019-02-13 00:00:00     2019-02-20 00:00:00     187

基本上,具有null 值的DateStartDateEnd 等于NEXT 行的DateStartDateEnd,如果它具有相同的Id

如何按照上述 PySpark 中的逻辑填写null 值?

数据帧:

df = (
    sc.parallelize([
        (107, "2019-08-11 00:00:00", None, None, 1111),
        (107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
        (128, "2019-02-11 00:00:00", None, None, 101), 
        (128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168), 
        (128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
    ]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)

这是我尝试过的:

from pyspark.sql.functions import col, when 
import pyspark.sql.functions as F
from pyspark.sql.window import Window  

my_window = Window.partitionBy("Id").orderBy("DateActual")

df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()

我不需要像df.na.fill(0) 这样的简单解决方案。我需要用 NEXT ROW 值替换 null 值,这可能假设使用 lag 或其他类似函数。

【问题讨论】:

我猜他们对你的问题投了反对票,因为你的问题已经存在于 SO 中。您应该首先搜索以查看您的问题是否已被提出。如果是这样,最好避免发布多余的问题。这篇文章是否有机会回答您的问题? ***.com/questions/42312042/… @alift:请在投票前仔细阅读问题。在您推荐的帖子中,df.na.fill(0) 用 0 填充 Null 值。这太微不足道了。我需要遵循我帖子中描述的逻辑。 【参考方案1】:

使用来自pyspark.sql.functionsfirst

from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('Id')\
               .orderBy('DateActual')\
               .rowsBetween(0,sys.maxsize)

# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)

# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)  

【讨论】:

我认为问题的关键词是在每个分区中回填空值。您可以修改您的帖子标题和正文以使其更加清晰。这样你的问题可能会帮助很多人。@Fluxy

以上是关于如何在 PySpark 中的每个分区中回填空值的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?

Pyspark - 计算每个数据框列中的空值数量

PySpark 如何在查询结果中获取分区名称?

在 BigQuery 中回填 Google Analytics

计算每个 pyspark RDD 分区中的元素数

如何在pyspark数据帧中过滤空值?