使用 sql,spark 根据某些条件从表中获取输出

Posted

技术标签:

【中文标题】使用 sql,spark 根据某些条件从表中获取输出【英文标题】:fetching output from a table based on some conditions using sql,spark 【发布时间】:2020-07-18 08:53:16 【问题描述】:

表格

item    value      month     year
A         0          8         2020
B         0          8         2020
A         0          9         2020
B         13          9         2020
A         0          10         2020
B         0          10         2020
A         10          11         2020
B         0          11         2020
A         0          12         2020
B         0          12         2020
A         0          1         2021
B         10          1         2021
A         0          2         2021
B         0          2         2021
A         0          3         2021
B         0          3         2021
A         11          4         2021
B         0          4         2021

输出:

item  month   year
A      8       2020
A      12       2020
A      1       2021
B      10       2020
B       2       2021

输出规则: 对于一个item,month,year,我们需要寻找接下来两个月,如果接下来两个月的sum(value)为0,那么它将符合输出。

例如:对于项目 A 月 8 年 2020,总和(值)为 0,直到 2020 年 10 月。 同样,对于项目 A 月 9 年 2020 总和(值)在接下来的两个月内不为 0,因此不会输出。

【问题讨论】:

2020 年 12 月应与 2021 年起的第 1、2 和 3 个月合并。 【参考方案1】:

IIUC,

以下答案将是您的方案的解决方案,

import os
import logging 
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

#input feed
simpleData = [["A",0,8,2020],["B",0,8,2020],["A",0,9,2020],["B",13,9,2020],["A",0,10,2020],["B",0,10,2020],["A",10,11,2020],["B",0,11,2020],["A",0,12,2020],["B",0,12,2020],["A",0,1,2021],["B",10,1,2021],["A",0,2,2021],["B",0,2,2021],["A",0,3,2021],["B",0,3,2021],["A",11,4,2021],["B",0,4,2021]]

#creating a dataframe
cSchema = StructType([StructField("item", StringType()),StructField("value", IntegerType()),StructField("month", IntegerType()),StructField("year", IntegerType())])
df = spark.createDataFrame(simpleData,schema=cSchema)
df.show()

'''
+----+-----+-----+----+
|item|value|month|year|
+----+-----+-----+----+
|   A|    0|    8|2020|
|   B|    0|    8|2020|
|   A|    0|    9|2020|
|   B|   13|    9|2020|
|   A|    0|   10|2020|
|   B|    0|   10|2020|
|   A|   10|   11|2020|
|   B|    0|   11|2020|
|   A|    0|   12|2020|
|   B|    0|   12|2020|
|   A|    0|    1|2021|
|   B|   10|    1|2021|
|   A|    0|    2|2021|
|   B|    0|    2|2021|
|   A|    0|    3|2021|
|   B|    0|    3|2021|
|   A|   11|    4|2021|
|   B|    0|    4|2021|
+----+-----+-----+----+
'''

#registering the dataframe for spark SQL
df.createOrReplaceTempView("input_table");

df1=spark.sql("select * from input_table where item='A'")
df1.createOrReplaceTempView("a_table");
df1=spark.sql("select *,(SUM(value) over ( order by year,month rows between current row and 2 following )- value) sum from a_table")
df1=df1.filter((df1.sum==0) & (df1.value==0))

df2=spark.sql("select * from input_Table where item='B'")
df2.createOrReplaceTempView("b_table");
df2=spark.sql("select *,(SUM(value) over ( order by year,month rows between current row and 2 following )- value) sum from b_table")
df2=df2.filter((df2.sum==0) & (df2.value==0))

df1.union(df2).show()
'''
+----+-----+-----+----+---+
|item|value|month|year|sum|
+----+-----+-----+----+---+
|   A|    0|    8|2020|  0|
|   A|    0|   12|2020|  0|
|   A|    0|    1|2021|  0|
|   B|    0|   10|2020|  0|
|   B|    0|    2|2021|  0|
|   B|    0|    3|2021|  0|
|   B|    0|    4|2021|  0|
+----+-----+-----+----+---+
'''

【讨论】:

请您回答以下问题。 ***.com/questions/62950700/… 确定@mradul,请投票接受这个答案【参考方案2】:

对于一个item,month,year,我们需要寻找接下来两个月,如果接下来两个月的sum(value)为0,那么它就有资格输出。

这听起来像lead() 或使用窗框:

select t.*
from (select t.*,
             sum(value) over (partition by item order by year, month rows between current row and 2 following) as sum_value_3
      from t
     ) t
where sum_value_3;

这会返回比您指定的更多的行,因为您的描述和所需的结果不同。您真的想要一系列行中的第一行,其中连续有 3 个或更多 0。解决此问题的一种方法是作为间隙和孤岛问题:

select item, year, month, cnt
from (select t.*,
             count(*) over (partition by item, seqnum - seqnum_1) as cnt,
             row_number() over (partition by item, seqnum - seqnum_1 order by year, month) as seqnum_within_group
      from (select t.*,
                   row_number() over (partition by item order by year, month) as seqnum,
                   row_number() over (partition by item, (case when value = 0 then 1 else 0 end) order by year, month) as seqnum_1
            from t
           ) t
      where value = 0
     ) t
where seqnum_within_group = 1 and cnt >= 3;

Here 是一个 dbfiddle,它使用 Postgres。

【讨论】:

以上是关于使用 sql,spark 根据某些条件从表中获取输出的主要内容,如果未能解决你的问题,请参考以下文章

SQL 语法从表中选择某些内容,如果结果不存在,则将结果插入新表中

如何也使用 where 条件从表中获取列名

SnowFlake 存储过程根据条件从表中删除一行

SQL 从表中选择并获取表名

从表中提取字符串以在查询中使用

从表中选择所有或仅特定的行