使用 sql,spark 根据某些条件从表中获取输出
Posted
技术标签:
【中文标题】使用 sql,spark 根据某些条件从表中获取输出【英文标题】:fetching output from a table based on some conditions using sql,spark 【发布时间】:2020-07-18 08:53:16 【问题描述】:表格
item value month year
A 0 8 2020
B 0 8 2020
A 0 9 2020
B 13 9 2020
A 0 10 2020
B 0 10 2020
A 10 11 2020
B 0 11 2020
A 0 12 2020
B 0 12 2020
A 0 1 2021
B 10 1 2021
A 0 2 2021
B 0 2 2021
A 0 3 2021
B 0 3 2021
A 11 4 2021
B 0 4 2021
输出:
item month year
A 8 2020
A 12 2020
A 1 2021
B 10 2020
B 2 2021
输出规则: 对于一个item,month,year,我们需要寻找接下来两个月,如果接下来两个月的sum(value)为0,那么它将符合输出。
例如:对于项目 A 月 8 年 2020,总和(值)为 0,直到 2020 年 10 月。 同样,对于项目 A 月 9 年 2020 总和(值)在接下来的两个月内不为 0,因此不会输出。
【问题讨论】:
2020 年 12 月应与 2021 年起的第 1、2 和 3 个月合并。 【参考方案1】:IIUC,
以下答案将是您的方案的解决方案,
import os
import logging
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
#input feed
simpleData = [["A",0,8,2020],["B",0,8,2020],["A",0,9,2020],["B",13,9,2020],["A",0,10,2020],["B",0,10,2020],["A",10,11,2020],["B",0,11,2020],["A",0,12,2020],["B",0,12,2020],["A",0,1,2021],["B",10,1,2021],["A",0,2,2021],["B",0,2,2021],["A",0,3,2021],["B",0,3,2021],["A",11,4,2021],["B",0,4,2021]]
#creating a dataframe
cSchema = StructType([StructField("item", StringType()),StructField("value", IntegerType()),StructField("month", IntegerType()),StructField("year", IntegerType())])
df = spark.createDataFrame(simpleData,schema=cSchema)
df.show()
'''
+----+-----+-----+----+
|item|value|month|year|
+----+-----+-----+----+
| A| 0| 8|2020|
| B| 0| 8|2020|
| A| 0| 9|2020|
| B| 13| 9|2020|
| A| 0| 10|2020|
| B| 0| 10|2020|
| A| 10| 11|2020|
| B| 0| 11|2020|
| A| 0| 12|2020|
| B| 0| 12|2020|
| A| 0| 1|2021|
| B| 10| 1|2021|
| A| 0| 2|2021|
| B| 0| 2|2021|
| A| 0| 3|2021|
| B| 0| 3|2021|
| A| 11| 4|2021|
| B| 0| 4|2021|
+----+-----+-----+----+
'''
#registering the dataframe for spark SQL
df.createOrReplaceTempView("input_table");
df1=spark.sql("select * from input_table where item='A'")
df1.createOrReplaceTempView("a_table");
df1=spark.sql("select *,(SUM(value) over ( order by year,month rows between current row and 2 following )- value) sum from a_table")
df1=df1.filter((df1.sum==0) & (df1.value==0))
df2=spark.sql("select * from input_Table where item='B'")
df2.createOrReplaceTempView("b_table");
df2=spark.sql("select *,(SUM(value) over ( order by year,month rows between current row and 2 following )- value) sum from b_table")
df2=df2.filter((df2.sum==0) & (df2.value==0))
df1.union(df2).show()
'''
+----+-----+-----+----+---+
|item|value|month|year|sum|
+----+-----+-----+----+---+
| A| 0| 8|2020| 0|
| A| 0| 12|2020| 0|
| A| 0| 1|2021| 0|
| B| 0| 10|2020| 0|
| B| 0| 2|2021| 0|
| B| 0| 3|2021| 0|
| B| 0| 4|2021| 0|
+----+-----+-----+----+---+
'''
【讨论】:
请您回答以下问题。 ***.com/questions/62950700/… 确定@mradul,请投票接受这个答案【参考方案2】:对于一个item,month,year,我们需要寻找接下来两个月,如果接下来两个月的sum(value)为0,那么它就有资格输出。
这听起来像lead()
或使用窗框:
select t.*
from (select t.*,
sum(value) over (partition by item order by year, month rows between current row and 2 following) as sum_value_3
from t
) t
where sum_value_3;
这会返回比您指定的更多的行,因为您的描述和所需的结果不同。您真的想要一系列行中的第一行,其中连续有 3 个或更多 0
。解决此问题的一种方法是作为间隙和孤岛问题:
select item, year, month, cnt
from (select t.*,
count(*) over (partition by item, seqnum - seqnum_1) as cnt,
row_number() over (partition by item, seqnum - seqnum_1 order by year, month) as seqnum_within_group
from (select t.*,
row_number() over (partition by item order by year, month) as seqnum,
row_number() over (partition by item, (case when value = 0 then 1 else 0 end) order by year, month) as seqnum_1
from t
) t
where value = 0
) t
where seqnum_within_group = 1 and cnt >= 3;
Here 是一个 dbfiddle,它使用 Postgres。
【讨论】:
以上是关于使用 sql,spark 根据某些条件从表中获取输出的主要内容,如果未能解决你的问题,请参考以下文章