Flink SQL--- Over Aggregation
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL--- Over Aggregation相关的知识,希望对你有一定的参考价值。
文章目录
一、Over 聚合介绍
Over 聚合定义(支持 Batch\\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。那这里我们拿 Over 聚合 与 窗口聚合 做一个对比,其之间的最大不同之处在于:
- 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到;
- Over 聚合:能够保留原始字段.在生产环境中,Over 聚合的使用场景还是比较少的。
GROUP BY是针对每个group产生一个aggregate value,而OVER则是对每一个row产生一个aggregate value
Over 聚合的语法总结如下:
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
其中:
- ORDER BY:必须是时间戳列(事件时间、处理时间)
- PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
- range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。
- 第一种为按照行数聚合,
- 第二种为按照时间区间聚合。
可以在一个SELECT子句中定义多个OVER窗口聚合。但是,对于流查询,由于当前的限制,所有聚合的OVER窗口必须相同。
二、案例
数据源
-- 源数据表
CREATE TABLE orders (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
-- 结果表
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
2.1、行数聚合
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM orders;
flink web控制台查看结果
2.2、时间聚合
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM orders;
2.3、在一个 SELECT 中有多个聚合窗口的聚合方式
Flink SQL 支持了一种简化写法,如下案例:
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
参考:
hive开窗函数-- over()
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/over-agg/
以上是关于Flink SQL--- Over Aggregation的主要内容,如果未能解决你的问题,请参考以下文章
flink笔记16 flink table windows(Group Windows/Over Windows)
flink笔记16 flink table windows(Group Windows/Over Windows)
Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams
Flink SQL ---Top-N ,Window Top-N
Apache Flink®极简教程: 架构及原理 Stateful Computations over Data Streams