带有条件的 Spark SQL 窗口函数范围边界
Posted
技术标签:
【中文标题】带有条件的 Spark SQL 窗口函数范围边界【英文标题】:Spark SQL window function range boundaries with condition 【发布时间】:2020-01-16 12:32:45 【问题描述】:我的数据如下所示:
Sequence| type | sg |
+-----------------+----------------+----------+
| 1| Pump |3 |
| 2| Pump |2 |
| 3| Inject |4 |
| 4| Pump |5 |
| 5| Pump |3 |
| 6| pump |6 |
| 7| Inject |7 |
| 8| Inject |8 |
| 9| Pump |9 |
+-----------------+----------------+----------+
我想添加一个新列并检查之前的type
值。
如果之前的type
值为Pump
,则将新列的值设置为对应的sg
的值。
如果是inject
,则获取所有先前行的sg
值的总和,直到找到具有Pump
type
的行(其sg
值包含在总和中)。
前:
对于Sequence = 2
,上一行的type
是Pump
,所以新列的值应该是对应的sg
列的值:3。
对于Sequence = 9
,前一行的type
为Inject
,因此新列的值将是前三行的sg
列的总和,因为Sequence = 6
行是前一行的第一个带有a type = Pump
。新列的值将是8 + 7 + 6 = 21
。
最终输出应该是这样的:
Sequence| type | sg | New sg |
+-----------------+----------------+----------+--------+
| 1| Pump |3 |-
| 2| Pump |2 |3
| 3| Inject |4 |2
| 4| Pump |5 |6
| 5| Pump |3 |5
| 6| pump |6 |3
| 7| Inject |7 |6
| 8| Inject |8 |7
| 9| Pump |9 |21
+-----------------+----------------+----------+
【问题讨论】:
【参考方案1】:根据您的规则,这只是一堆窗口函数。诀窍是用“注入”按组聚合“泵”值。 “泵”的累积总和找到组。
那么查询是:
select t.*,
(case when prev_type = 'Pump' then sg
else lag(pump_sg) over (order by id)
end) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
lag(sg) over (order by id) as prev_sg,
lag(type) over (order by id) as prev_type,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;
我认为您的规则过于复杂,并且您不需要将前一行作为“泵”的特殊情况。所以:
select t.*,
lag(pump_sg) over (order by id) as your_value
from (select t.*,
sum(sg) over (partition by pump_grp) as pump_sg
from (select t.*,
sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
from t
) t
) t;
【讨论】:
谢谢戈登 :) !!.以上是关于带有条件的 Spark SQL 窗口函数范围边界的主要内容,如果未能解决你的问题,请参考以下文章