如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值
Posted
技术标签:
【中文标题】如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值【英文标题】:How to get last value for every partition to impute missing value in spark SQL 【发布时间】:2020-12-01 22:40:14 【问题描述】:我有一个样本数据,我想在其中估算缺失值。缺少数据的行由blank
表示。这是示例数据-
val my_df = spark.sql(s"""
select 1 as id, 1 as time_gmt, 'a' as pagename
union
select 1 as id, 2 as time_gmt, 'b' as pagename
union
select 1 as id, 3 as time_gmt, 'blank' as pagename
union
select 1 as id, 4 as time_gmt, 'blank' as pagename
union
select 1 as id, 5 as time_gmt, 'd' as pagename
union
select 2 as id, 1 as time_gmt, 'c' as pagename
union
select 2 as id, 2 as time_gmt, 'a' as pagename
union
select 2 as id, 3 as time_gmt, 'c' as pagename
union
select 2 as id, 4 as time_gmt, 'blank' as pagename
union
select 2 as id, 5 as time_gmt, 'd' as pagename
""")
my_df.createOrReplaceTempView("my_df")
scala> my_df.orderBy("id","time_gmt").show(false)
+---+--------+--------+
|id |time_gmt|pagename|
+---+--------+--------+
|1 |1 |a |
|1 |2 |b |
|1 |3 |blank |
|1 |4 |blank |
|1 |5 |d |
|2 |1 |c |
|2 |2 |a |
|2 |3 |c |
|2 |4 |blank |
|2 |5 |d |
+---+--------+--------+
如您所见,id 为 1 的数据有 2 个空白,id 为 2 的数据有 1 个空白。我想使用每个 ID 观察到的最新非空白值填充这些值,按time_gmt
排序柱子。所以我的输出是 -
+---+--------+--------+----------------+
|id |time_gmt|pagename|pagename_imputed|
+---+--------+--------+----------------+
|1 |1 |a | a |
|1 |2 |b | b |
|1 |3 |blank | b |
|1 |4 |blank | b |
|1 |5 |d | d |
|2 |1 |c | c |
|2 |2 |a | a |
|2 |3 |c | c |
|2 |4 |blank | c |
|2 |5 |d | d |
+---+--------+--------+----------------+
如何在 spark SQL 中做到这一点?
注意 - 在非空白值之后,每个分区的空白可以出现多次。
【问题讨论】:
【参考方案1】:一个选项使用窗口函数。这个想法是定义记录组,其中“空白”记录将与最后一个非空白记录属于同一组。
假设 blank 你的意思是null
,我们可以定义具有窗口计数的组:
select id, time_gmt,
max(pagename) over(partition by id, grp) as pagename
from (
select t.*,
count(pagename) over(partition by id order by time_gmt) as grp
from mytable t
) t
如果你的意思是字符串'blank'
,那么:
select id, time_gmt,
max(case when pagename <> 'blank' then pagename end) over(partition by id, grp) as pagename
from (
select t.*,
sum(case when pagename = 'blank' then 0 else 1 end) over(partition by id order by time_gmt) as grp
from mytable t
) t
【讨论】:
嗨,感谢您的解决方案。是的,我的意思是字符串“空白”。但是当我使用你的第二个解决方案时,我无法得到预期的答案。只有 ID 2 被估算,而 ID 在您的查询后有 3 条空白记录 @Regressor:是的,我看到了问题所在。外部窗口功能也需要进行调整以正确处理“空白”。完成。 基于sum()..
函数创建群组的绝佳解决方案。谢谢。【参考方案2】:
针对此类用例有一个特殊的窗口函数last(expr, [IgnoreNulls]) over()
:
select id, time_gmt, last(nullif(pagename, 'blank'), true) over(partition by id order by time_gmt) as pagename
from my_df
https://spark.apache.org/docs/latest/api/sql/index.html#last
【讨论】:
以上是关于如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值的主要内容,如果未能解决你的问题,请参考以下文章
从 xxx.plist 获取值以在 Target 中构建设置
如何从 rxjs 流中过滤单个值以在 Angular 2 组件中打印
如何使用 PyQt5 QThread 从 CLASS(ShowVideo) 中获取值以在第二个 CLASS(ImageViewer) 中使用(根据我的程序)
将浮点值存储在数组中,并分析最后 20 个值以在 C++ 中检查