如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值

Posted

技术标签:

【中文标题】如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值【英文标题】:How to get last value for every partition to impute missing value in spark SQL 【发布时间】:2020-12-01 22:40:14 【问题描述】:

我有一个样本数据,我想在其中估算缺失值。缺少数据的行由blank 表示。这是示例数据-

val my_df = spark.sql(s"""
select 1 as id, 1 as time_gmt, 'a' as pagename
union
select 1 as id, 2 as time_gmt, 'b' as pagename
union
select 1 as id, 3 as time_gmt, 'blank' as pagename
union
select 1 as id, 4 as time_gmt, 'blank' as pagename
union
select 1 as id, 5 as time_gmt, 'd' as pagename
union
select 2 as id, 1 as time_gmt, 'c' as pagename
union
select 2 as id, 2 as time_gmt, 'a' as pagename
union
select 2 as id, 3 as time_gmt, 'c' as pagename
union
select 2 as id, 4 as time_gmt, 'blank' as pagename
union
select 2 as id, 5 as time_gmt, 'd' as pagename
""")
my_df.createOrReplaceTempView("my_df")

scala> my_df.orderBy("id","time_gmt").show(false)
+---+--------+--------+
|id |time_gmt|pagename|
+---+--------+--------+
|1  |1       |a       |
|1  |2       |b       |
|1  |3       |blank   |
|1  |4       |blank   |
|1  |5       |d       |
|2  |1       |c       |
|2  |2       |a       |
|2  |3       |c       |
|2  |4       |blank   |
|2  |5       |d       |
+---+--------+--------+

如您所见,id 为 1 的数据有 2 个空白,id 为 2 的数据有 1 个空白。我想使用每个 ID 观察到的最新非空白值填充这些值,按time_gmt 排序柱子。所以我的输出是 -

+---+--------+--------+----------------+
|id |time_gmt|pagename|pagename_imputed|
+---+--------+--------+----------------+
|1  |1       |a       | a              | 
|1  |2       |b       | b              | 
|1  |3       |blank   | b              | 
|1  |4       |blank   | b              |
|1  |5       |d       | d              | 
|2  |1       |c       | c              | 
|2  |2       |a       | a              | 
|2  |3       |c       | c              | 
|2  |4       |blank   | c              | 
|2  |5       |d       | d              | 
+---+--------+--------+----------------+

如何在 spark SQL 中做到这一点?

注意 - 在非空白值之后,每个分区的空白可以出现多次。

【问题讨论】:

【参考方案1】:

一个选项使用窗口函数。这个想法是定义记录组,其中“空白”记录将与最后一个非空白记录属于同一组。

假设 blank 你的意思是null,我们可以定义具有窗口计数的组:

select id, time_gmt, 
    max(pagename) over(partition by id, grp) as pagename
from (
    select t.*, 
        count(pagename) over(partition by id order by time_gmt) as grp
    from mytable t
) t

如果你的意思是字符串'blank',那么:

select id, time_gmt, 
    max(case when pagename <> 'blank' then pagename end) over(partition by id, grp) as pagename
from (
    select t.*, 
        sum(case when pagename = 'blank' then 0 else 1 end) over(partition by id order by time_gmt) as grp
    from mytable t
) t

【讨论】:

嗨,感谢您的解决方案。是的,我的意思是字符串“空白”。但是当我使用你的第二个解决方案时,我无法得到预期的答案。只有 ID 2 被估算,而 ID 在您的查询后有 3 条空白记录 @Regressor:是的,我看到了问题所在。外部窗口功能也需要进行调整以正确处理“空白”。完成。 基于sum().. 函数创建群组的绝佳解决方案。谢谢。【参考方案2】:

针对此类用例有一个特殊的窗口函数last(expr, [IgnoreNulls]) over()

select id, time_gmt, last(nullif(pagename, 'blank'), true) over(partition by id order by time_gmt) as pagename
from my_df

https://spark.apache.org/docs/latest/api/sql/index.html#last

【讨论】:

以上是关于如何获取每个分区的最后一个值以在 Spark SQL 中估算缺失值的主要内容,如果未能解决你的问题,请参考以下文章

从 xxx.plist 获取值以在 Target 中构建设置

如何从 rxjs 流中过滤单个值以在 Angular 2 组件中打印

如何使用 PyQt5 QThread 从 CLASS(ShowVideo) 中获取值以在第二个 CLASS(ImageViewer) 中使用(根据我的程序)

将浮点值存储在数组中,并分析最后 20 个值以在 C++ 中检查

使用 RDD.mapPartitionsWithIndex 时如何获取每个分区的索引?

从 localStorage 获取值以在 Sencha Touch AJAX 代理中使用