Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)

Posted

技术标签:

【中文标题】Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)【英文标题】:Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY aggregation (but it's not standard) 【发布时间】:2018-07-11 09:00:53 【问题描述】:

(在 Spark 2.2 和 2.3 上测试)

我正在使用 Spark 将股票交易报价汇总到每日 OHLC(开盘-高-低-收)记录中。

输入的数据是这样的

val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")

data.createOrReplaceTempView("ticks")

data.show

scala>

显示为

+-------------------+-----+
|               time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+

想要的输出是

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

已经有this和this等多种SQL解决方案。

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
    MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
    MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
    LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks

由于SQL的限制,这些方案比较繁琐。

今天,我发现 Spark SQL 可以在 GROUP BY 上下文中使用 FIRST_VALUELAST_VALUE which is not allowed in standard SQL。

Spark SQL 的这种无限性衍生出一个整洁的解决方案,如下所示:

SELECT
    TO_DATE(time) AS date,
    FIRST_VALUE(price) AS open,
    MAX(price) AS high,
    MIN(price) AS low,
    LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)

你可以试试

spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show

scala>

显示为

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+

但是,上面的结果是不正确的。 (请与上述预期结果进行比较。)

FIRST_VALUELAST_VALUE 需要确定性排序才能获得确定性结果。

我可以通过在分组前添加orderBy 来纠正它。

import org.apache.spark.sql.functions._

data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show

scala>

显示为

+----------+----+----+----+-----+
|      date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+

根据需要正确!!!

我的问题是,上面的代码“orderBy then groupBy”是否有效?这个订购有保证吗?我们可以在严肃的作品中使用这种非标准功能吗?

这个问题的重点是,在标准 SQL 中,我们只能先执行 GROUP BY 然后 ORDER BY 对聚合进行排序,而不是 ORDER BY 然后 GROUP BYGROUP BY 将忽略 ORDER BY 的顺序。

我也想知道 Spark SQL 是否可以在所需的顺序下执行这样的GROUP BY,标准 SQL 是否也可以为此发明这样的语法?

附言

我可以想到一些依赖于确定性排序的聚合函数。

WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID

WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID

如果没有WITH ORDER BY time,我们如何在标准 SQL 中对 COLLECTed_LIST 进行排序?

这些例子表明“GROUP BY under desired ordering”仍然有用。

【问题讨论】:

我想知道如果我们可以这样使用它,提供first_value, last_value 有什么意义,如果不是这样,预期的用例是什么? :( 如果某个查询结果是有序的,并且它的分组是保序的,那么我们可以一次获得first_value,last_value,min,max,sum,count ...。不需要多个 partition-by 语句。我相信 spark 的 orderby+groupby 是保序的,因为我在上面的例子中得到了正确的结果。 至于用例,我在 P.S. 中展示了 2:COLLECT_LISTprice - LAG(price)。我的问题的一部分。真正的预期用例是真正为股票价格计算 OHLC。 【参考方案1】:

groupBy/agg 中的排序不保证,可以使用窗口函数,按键分区和时间排序

【讨论】:

我没有选择你的答案,因为我无法判断你所说的“不保证”是否属实。也许有保证但没有记录?如果您提供更多证据,例如文档或源代码研究,我会选择您的答案。谢谢。 您提供的关于窗口功能的内容是我问题中超链接中已知的解决方案之一。我的问题是基于此。为了澄清,我刚刚编辑了我的问题以添加窗口 SQL 语句。

以上是关于Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)的主要内容,如果未能解决你的问题,请参考以下文章

org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数

Spark Window 聚合与 Group By/Join 性能

如何使用group by聚合spark中的结构数组

Spark Scala数据框具有单个Group By的多个聚合[重复]

spark Group By数据框列没有聚合[重复]

SQL中只要用到聚合函数就一定要用到group by 吗?