当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件
Posted
技术标签:
【中文标题】当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件【英文标题】:Add condition to last() function in pyspark sql when used by window/partition with forward filling 【发布时间】:2020-04-27 18:17:24 【问题描述】:我得到的代码来自这个链接: https://johnpaton.net/posts/forward-fill-spark/ 它有一些我想要完成的任务的背景。
from pyspark.sql import Window
from pyspark.sql.functions import last
# define the window
window = Window.partitionBy('location')\
.orderBy('time')\
.rowsBetween(-sys.maxsize, 0)
# define the forward-filled column
filled_column = last(spark_df['temperature'], ignorenulls=True).over(window)
# do the fill
spark_df_filled = spark_df.withColumn('temp_filled_spark', filled_column)
基本上,last()
函数用于查找最后一个非空值的状态。如果所有值都为 null,则返回 null。
但是,如果该组中的所有列都为空,我想分配一个默认值。我尝试了不同的方法,但无法弄清楚。
因此,基本上,如果某个位置的温度全部为空,我希望有一种方法可以将其设置为默认值。
Some examples:
I want to fill them with default values for the case below:
location temp temp
1 null 0
1 null =====> 0
1 null 0
I do not want to fill them with default values for the case below:
location temp temp
1 null null
1 50 ======> 50
1 60 60
【问题讨论】:
您能否展示一些具有预期输出的示例数据? @Vamsi Prabhala:刚刚添加了一个链接。谢谢! 如果所有值都为 Null,您要使用哪个默认值? @CPak,可能是0。这只是一个例子,实际上,我正在重新调用链接中的代码来完成类似的任务。例如,如果某个位置的所有温度值为空,我希望该位置的所有温度都设置为 0 而不是 null 【参考方案1】:也许您可以定义另一个列,以用作给定位置的任何记录是否包含非空值的指示符。例如:
window_2 = Window.partitionBy('location').rowsBetween(-sys.maxsize, sys.maxsize)
max_column = max(spark_df['temperature']).over(window_2)
然后,将该列与您的filled_column
一起使用,以有条件地填写最终结果:
temp_filled_spark = when(max_column.isNull(),0).otherwise(filled_column)
spark_df_filled = spark_df.withColumn('temp_filled_spark', temp_filled_spark)
可能不是很优雅或超级性能,但应该可以工作。
【讨论】:
感谢您提供建议。但是,这不是我的情况的用例。当所有温度值都为空时,我只想用默认值填充一个组。如果某些温度缺少初始状态但稍后具有有效温度,我不想填充这些初始状态。 我最初不知何故忽略了这个要求......现在更新了我的答案。以上是关于当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件的主要内容,如果未能解决你的问题,请参考以下文章