Flink SQL--- Over Aggregation

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL--- Over Aggregation相关的知识,希望对你有一定的参考价值。

文章目录

一、Over 聚合介绍

Over 聚合定义(支持 Batch\\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。那这里我们拿 Over 聚合​窗口聚合 做一个对比,其之间的最大不同之处在于:

  • 窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到;
  • Over 聚合:能够保留原始字段.在生产环境中,Over 聚合的使用场景还是比较少的。

GROUP BY是针对每个group产生一个aggregate value,而OVER则是对每一个row产生一个aggregate value

Over 聚合的语法总结如下:

SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

其中:

  • ORDER BY:必须是时间戳列(事件时间、处理时间)
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。
    • 第一种为按照行数聚合​,
    • 第二种为按照时间区间聚合。

可以在一个SELECT子句中定义多个OVER窗口聚合。但是,对于流查询,由于当前的限制,所有聚合的OVER窗口必须相同。

二、案例

数据源

-- 源数据表
CREATE TABLE orders (
    order_id BIGINT,
    product BIGINT,
    amount BIGINT,
    order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
    WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.order_id.min' = '1',
  'fields.order_id.max' = '2',
  'fields.amount.min' = '1',
  'fields.amount.max' = '10',
  'fields.product.min' = '1',
  'fields.product.max' = '2'
);

-- 结果表
CREATE TABLE sink_table (
    product BIGINT,
    order_time TIMESTAMP(3),
    amount BIGINT,
    one_hour_prod_amount_sum BIGINT
) WITH (
  'connector' = 'print'
);

2.1、行数聚合

按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 5 行数据
    ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM orders;

flink web控制台查看结果

2.2、时间聚合

按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

INSERT INTO sink_table
SELECT product, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    -- 标识统计范围是一个 product 的最近 1 小时的数据
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM orders;

2.3、在一个 SELECT 中有多个聚合窗口的聚合方式

Flink SQL 支持了一种简化写法,如下案例:

SELECT order_id, order_time, amount,
  SUM(amount) OVER w AS sum_amount,
  AVG(amount) OVER w AS avg_amount
FROM orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
  PARTITION BY product
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

参考:
hive开窗函数-- over()
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/over-agg/

以上是关于Flink SQL--- Over Aggregation的主要内容,如果未能解决你的问题,请参考以下文章

flink笔记16 flink table windows(Group Windows/Over Windows)

flink笔记16 flink table windows(Group Windows/Over Windows)

Flink 极简教程: 架构及原理 Apache Flink® — Stateful Computations over Data Streams

Flink SQL ---Top-N ,Window Top-N

Apache Flink®极简教程: 架构及原理 Stateful Computations over Data Streams

Flink Unit Test over ProcessWindowFunction<IN, OUT, KEY, W>