带有条件的 Spark SQL 窗口函数范围边界

Posted

技术标签:

【中文标题】带有条件的 Spark SQL 窗口函数范围边界【英文标题】:Spark SQL window function range boundaries with condition 【发布时间】:2020-01-16 12:32:45 【问题描述】:

我的数据如下所示:

         Sequence|       type      | sg       |
+-----------------+----------------+----------+
|              1| Pump             |3         |
|              2| Pump             |2         |
|              3| Inject           |4         |
|              4| Pump             |5         |
|              5| Pump             |3         | 
|              6| pump             |6         |
|              7| Inject           |7         |
|              8| Inject           |8         |
|              9| Pump             |9         |
+-----------------+----------------+----------+

我想添加一个新列并检查之前的type 值。

如果之前的type值为Pump,则将新列的值设置为对应的sg的值。

如果是inject,则获取所有先前行的sg 值的总和,直到找到具有Pump type 的行(其sg 值包含在总和中)。

前: 对于Sequence = 2,上一行的typePump,所以新列的值应该是对应的sg列的值:3。

对于Sequence = 9,前一行的typeInject,因此新列的值将是前三行的sg 列的总和,因为Sequence = 6 行是前一行的第一个带有a type = Pump。新列的值将是8 + 7 + 6 = 21

最终输出应该是这样的:

       Sequence|       type      | sg       |  New sg |
+-----------------+----------------+----------+--------+
|              1| Pump             |3         |-
|              2| Pump             |2         |3
|              3| Inject           |4         |2
|              4| Pump             |5         |6
|              5| Pump             |3         |5
|              6| pump             |6         |3
|              7| Inject           |7         |6
|              8| Inject           |8         |7
|              9| Pump             |9         |21
+-----------------+----------------+----------+

【问题讨论】:

【参考方案1】:

根据您的规则,这只是一堆窗口函数。诀窍是用“注入”按组聚合“泵”值。 “泵”的累积总和找到组。

那么查询是:

select t.*,
        (case when prev_type = 'Pump' then sg
              else lag(pump_sg) over (order by id)
         end) as your_value
from (select t.*,
             sum(sg) over (partition by pump_grp) as pump_sg
      from (select t.*,
                   lag(sg) over (order by id) as prev_sg,
                   lag(type) over (order by id) as prev_type,
                 sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
            from t
           ) t
     ) t;

我认为您的规则过于复杂,并且您不需要将前一行作为“泵”的特殊情况。所以:

select t.*,
       lag(pump_sg) over (order by id) as your_value
from (select t.*,
             sum(sg) over (partition by pump_grp) as pump_sg
      from (select t.*,
                 sum(case when type = 'Pump' then 1 else 0 end) over (order by id) as pump_grp
            from t
           ) t
     ) t;

【讨论】:

谢谢戈登 :) !!.

以上是关于带有条件的 Spark SQL 窗口函数范围边界的主要内容,如果未能解决你的问题,请参考以下文章

具有复杂条件的 Spark SQL 窗口函数

具有复杂条件的 Spark SQL 窗口函数

jquery飘动的广告窗

Spark 窗口函数 - rangeBetween 日期

Spark 窗口函数 - rangeBetween 日期

当窗口/分区使用前向填充时,向 pyspark sql 中的 last() 函数添加条件