如何在 PySpark 中的每个分区中回填空值
Posted
技术标签:
【中文标题】如何在 PySpark 中的每个分区中回填空值【英文标题】:How to backfill null values in each partition in PySpark 【发布时间】:2020-01-10 13:47:07 【问题描述】:我在 PySpark 中有以下 DataFrame:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 null null 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 null null 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
我需要替换 null
值以获得以下结果:
Id DateActual DateStart DateEnd SourceCode
107 2019-08-11 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
107 2019-08-16 00:00:00 2019-08-11 00:00:00 2019-08-18 00:00:00 1111
128 2019-02-11 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 101
128 2019-02-13 00:00:00 2019-02-11 00:00:00 2019-02-18 00:00:00 168
128 2019-02-14 00:00:00 2019-02-13 00:00:00 2019-02-20 00:00:00 187
基本上,具有null
值的DateStart
和DateEnd
等于NEXT 行的DateStart
和DateEnd
,如果它具有相同的Id
。
如何按照上述 PySpark 中的逻辑填写null
值?
数据帧:
df = (
sc.parallelize([
(107, "2019-08-11 00:00:00", None, None, 1111),
(107, "2019-08-16 00:00:00", "2019-08-11 00:00:00", "2019-08-18 00:00:00", 1111),
(128, "2019-02-11 00:00:00", None, None, 101),
(128, "2019-02-13 00:00:00", "2019-02-11 00:00:00", "2019-02-11 00:00:00", 168),
(128, "2019-02-14 00:00:00", "2019-02-13 00:00:00", "2019-02-20 00:00:00", 187)
]).toDF(["Id", "DateActual", "DateStart", "DateEnd", "SourceCode"])
)
这是我尝试过的:
from pyspark.sql.functions import col, when
import pyspark.sql.functions as F
from pyspark.sql.window import Window
my_window = Window.partitionBy("Id").orderBy("DateActual")
df.withColumn("DateStart_start", when(col("DateStart").isNull(), F.lag(df.DateStart).over(my_window)).otherwise(col("DateStart"))).show()
我不需要像df.na.fill(0)
这样的简单解决方案。我需要用 NEXT ROW 值替换 null
值,这可能假设使用 lag
或其他类似函数。
【问题讨论】:
我猜他们对你的问题投了反对票,因为你的问题已经存在于 SO 中。您应该首先搜索以查看您的问题是否已被提出。如果是这样,最好避免发布多余的问题。这篇文章是否有机会回答您的问题? ***.com/questions/42312042/… @alift:请在投票前仔细阅读问题。在您推荐的帖子中,df.na.fill(0)
用 0 填充 Null 值。这太微不足道了。我需要遵循我帖子中描述的逻辑。
【参考方案1】:
使用来自pyspark.sql.functions
的first
:
from pyspark.sql import Window
from pyspark.sql.functions import first
# define the window
window = Window.partitionBy('Id')\
.orderBy('DateActual')\
.rowsBetween(0,sys.maxsize)
# define the back-filled column
filled_column_start = first(spark_df['DateStart'], ignorenulls=True).over(window)
filled_column_end = first(spark_df['DateEnd'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('filled_start', filled_column_start)
spark_df_filled = spark_df_filled .withColumn('filled_end', filled_column_end)
# show off our glorious achievements
spark_df_filled.orderBy('Id').show(10)
【讨论】:
我认为问题的关键词是在每个分区中回填空值。您可以修改您的帖子标题和正文以使其更加清晰。这样你的问题可能会帮助很多人。@Fluxy以上是关于如何在 PySpark 中的每个分区中回填空值的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark RDD 中,如何使用 foreachPartition() 打印出每个分区的第一条记录?