Spark.sql 按 MAX 过滤行

Posted

技术标签:

【中文标题】Spark.sql 按 MAX 过滤行【英文标题】:Spark.sql Filter rows by MAX 【发布时间】:2022-01-10 08:09:52 【问题描述】:

以下是您可以想象的更大的源文件的一部分:

date,code1,postcode,cityname,total
2020-03-27,2011,X700,Curepipe,44
2020-03-29,2011,X700,Curepipe,44
2020-03-26,2011,X700,Curepipe,22
2020-03-27,2035,X920,vacoas,3
2020-03-25,2011,X920,vacoas,1
2020-03-24,2122,X760,souillac,22
2020-03-23,2122,X760,souillac,11
2020-03-22,2257,X760,souillac,10
2020-03-27,2480,X510,rosehill,21
2020-03-22,2035,X510,rosehill,7
2020-03-20,2035,X510,rosehill,3

以下代码后:

#Load data
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("source").getOrCreate()
dfcases = spark.read.format("csv").option("header", "true").load("sourcefile.csv")

dfcases.createOrReplaceTempView("tablecases")
spark.sql(XXXXXXXXXXXXX).show() #mysql code to insert

我想得到这个结果:

Curepipe,X700,2020-03-27,44
Curepipe,X700,2020-03-29,44
souillac,X760,2020-03-24,22
rosehill,X510,2020-03-27,21
vacoas,X920,2020-03-27,3

我们的目标是:

    选择每个城市名称的总和最大的日期(注意,如果一个城市在 2 个不同的日期有 MAX 总和,则一个城市可以出现两次), 按总降序、日期升序、城市名升序排序。

谢谢!

【问题讨论】:

是不是这个问题的SQL版本:***.com/questions/70181393/…? 【参考方案1】:

下面的查询生成你想要的输出

SELECT cityname, postcode, date, COUNT(*) AS total 
FROM tablecases 
GROUP BY cityname, postcode, date 
HAVING COUNT(*) > 1
ORDER BY COUNT(*) DESC, date, cityname

db<>fiddle中的演示

【讨论】:

是的,我明白了,这就是我使用“HAVING COUNT(*) > 1”的原因。您不需要带分组的 max 函数,并且后查询会产生所需的结果。 见dbfiddle.uk/… 是的,您可以编辑,但它会为您提供一个新链接 我已经编辑了帖子以使其更易于理解并更接近实际问题,谢谢!【参考方案2】:

您可以在请求中使用SQL window 获得结果,如下所示:

SELECT
  cityname, 
  postcode, 
  date, 
  total
FROM
 (SELECT 
    cityname, 
    postcode, 
    date, 
    total, 
    MAX(total) OVER (PARTITION BY cityname ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_total 
  FROM tablecases)
WHERE max_total = total
ORDER BY max_total DESC, date, cityname

【讨论】:

感谢文森特,PARTITION BY 工作,这就是我找到的解决方案:spark.sql("SELECT name, postcode, date, total FROM (SELECT *, MAX(total) OVER (PARTITION BY name) AS maxtotal FROM table2) WHERE total = maxtotal ORDER BY total DESC, date, name DESC")

以上是关于Spark.sql 按 MAX 过滤行的主要内容,如果未能解决你的问题,请参考以下文章

Spark Sql 查询嵌套记录。我想先过滤嵌套的记录数组,然后爆炸(将它们展开成行)

在 Spark 中,我无法按现有列进行过滤

Spark 过滤器未按预期工作。“列”对象不可调用

如何通过嵌套数组字段(数组中的数组)过滤Spark sql?

哪个更快? Spark SQL with Where 子句或在 Spark SQL 之后在 Dataframe 中使用过滤器

过滤时Spark sql“期货在300秒后超时”