Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)
Posted
技术标签:
【中文标题】Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)【英文标题】:Spark SQL can use FIRST_VALUE and LAST_VALUE in a GROUP BY aggregation (but it's not standard) 【发布时间】:2018-07-11 09:00:53 【问题描述】:(在 Spark 2.2 和 2.3 上测试)
我正在使用 Spark 将股票交易报价汇总到每日 OHLC(开盘-高-低-收)记录中。
输入的数据是这样的
val data = Seq(("2018-07-11 09:01:00", 34.0), ("2018-07-11 09:04:00", 32.0), ("2018-07-11 09:02:00", 35.0), ("2018-07-11 09:03:00", 30.0), ("2018-07-11 09:00:00", 33.0), ("2018-07-12 09:01:00", 56.0), ("2018-07-12 09:04:00", 54.0), ("2018-07-12 09:02:00", 51.0), ("2018-07-12 09:03:00", 50.0), ("2018-07-12 09:00:00", 51.0)).toDF("time", "price")
data.createOrReplaceTempView("ticks")
data.show
scala>
显示为
+-------------------+-----+
| time|price|
+-------------------+-----+
|2018-07-11 09:01:00| 34.0|
|2018-07-11 09:04:00| 32.0|
|2018-07-11 09:02:00| 35.0|
|2018-07-11 09:03:00| 30.0|
|2018-07-11 09:00:00| 33.0|
|2018-07-12 09:01:00| 56.0|
|2018-07-12 09:04:00| 54.0|
|2018-07-12 09:02:00| 51.0|
|2018-07-12 09:03:00| 50.0|
|2018-07-12 09:00:00| 51.0|
+-------------------+-----+
想要的输出是
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
已经有this和this等多种SQL解决方案。
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS open,
MAX(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS high,
MIN(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS low,
LAST_VALUE(price) OVER (PARTITION BY TO_DATE(time) ORDER BY time) AS close
FROM ticks
由于SQL的限制,这些方案比较繁琐。
今天,我发现 Spark SQL 可以在 GROUP BY
上下文中使用 FIRST_VALUE
和 LAST_VALUE
which is not allowed in standard SQL。
Spark SQL 的这种无限性衍生出一个整洁的解决方案,如下所示:
SELECT
TO_DATE(time) AS date,
FIRST_VALUE(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST_VALUE(price) AS close
FROM ticks
GROUP BY TO_DATE(time)
你可以试试
spark.sql("SELECT TO_DATE(time) AS date, FIRST(price) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price) AS close FROM ticks GROUP BY TO_DATE(time)").show
scala>
显示为
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|34.0|35.0|30.0| 33.0|
|2018-07-12|56.0|56.0|50.0| 51.0|
+----------+----+----+----+-----+
但是,上面的结果是不正确的。 (请与上述预期结果进行比较。)
FIRST_VALUE
和 LAST_VALUE
需要确定性排序才能获得确定性结果。
我可以通过在分组前添加orderBy
来纠正它。
import org.apache.spark.sql.functions._
data.orderBy("time").groupBy(expr("TO_DATE(time)").as("date")).agg(first("price").as("open"), max("price").as("high"), min("price").as("low"), last("price").as("close")).show
scala>
显示为
+----------+----+----+----+-----+
| date|open|high| low|close|
+----------+----+----+----+-----+
|2018-07-11|33.0|35.0|30.0| 32.0|
|2018-07-12|51.0|56.0|50.0| 54.0|
+----------+----+----+----+-----+
根据需要正确!!!
我的问题是,上面的代码“orderBy then groupBy”是否有效?这个订购有保证吗?我们可以在严肃的作品中使用这种非标准功能吗?
这个问题的重点是,在标准 SQL 中,我们只能先执行 GROUP BY
然后 ORDER BY
对聚合进行排序,而不是 ORDER BY
然后 GROUP BY
。
GROUP BY
将忽略 ORDER BY
的顺序。
我也想知道 Spark SQL 是否可以在所需的顺序下执行这样的GROUP BY
,标准 SQL 是否也可以为此发明这样的语法?
附言
我可以想到一些依赖于确定性排序的聚合函数。
WITH ORDER BY time SELECT COLLECT_LIST(price) GROUP BY stockID
WITH ORDER BY time SELECT SUM(SQUARE(price - LAG(price, 1, 0))) GROUP BY stockID
如果没有WITH ORDER BY time
,我们如何在标准 SQL 中对 COLLECTed_LIST 进行排序?
这些例子表明“GROUP BY
under desired ordering”仍然有用。
【问题讨论】:
我想知道如果我们可以这样使用它,提供first_value, last_value
有什么意义,如果不是这样,预期的用例是什么? :(
如果某个查询结果是有序的,并且它的分组是保序的,那么我们可以一次获得first_value,last_value,min,max,sum,count ...。不需要多个 partition-by 语句。我相信 spark 的 orderby+groupby 是保序的,因为我在上面的例子中得到了正确的结果。
至于用例,我在 P.S. 中展示了 2:COLLECT_LIST
和 price - LAG(price)
。我的问题的一部分。真正的预期用例是真正为股票价格计算 OHLC。
【参考方案1】:
groupBy/agg 中的排序不保证,可以使用窗口函数,按键分区和时间排序
【讨论】:
我没有选择你的答案,因为我无法判断你所说的“不保证”是否属实。也许有保证但没有记录?如果您提供更多证据,例如文档或源代码研究,我会选择您的答案。谢谢。 您提供的关于窗口功能的内容是我问题中超链接中已知的解决方案之一。我的问题是基于此。为了澄清,我刚刚编辑了我的问题以添加窗口 SQL 语句。以上是关于Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)的主要内容,如果未能解决你的问题,请参考以下文章
org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数
Spark Window 聚合与 Group By/Join 性能