flink sql clinet 实战:窗口函数----flink-1.13.6
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink sql clinet 实战:窗口函数----flink-1.13.6相关的知识,希望对你有一定的参考价值。
文章目录
- 1、[造模拟数据](https://blog.csdn.net/wuxintdrh/article/details/127373437)
- 2、创建kafka表
- 三、窗口函数 Windowing table-valued functions (Windowing TVFs)
1、造模拟数据
2、创建kafka表
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`ts` TIMESTAMP(3),
proctime as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.bootstrap.servers' = '10.106.100.228:6667',
'properties.group.id' = 'testGroup',
'format' = 'csv'
);
三、窗口函数 Windowing table-valued functions (Windowing TVFs)
官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/
窗口函数分类:
- Tumble Windows
- Hop Windows
- Cumulate Windows
- Session Windows (will be supported soon)
窗口 TVF 的返回值是一个新的关系,它包括原关系的所有列,以及另外3个列“window_start”、“window_end”、“window_time”,以指示分配的窗口。
3.1、滚动窗口 (TUMBLE)
每个窗口没有重叠,窗口长度固定
滚动窗口的函数格式:
TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
- data 表名
- timecol 时间属性字段
- 时间窗口的大小 INTERVAL '10' MINUTES
测试案例
表结构
Flink SQL> desc user_log;
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
| name | type | null | key | extras | watermark |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
| user_id | STRING | true | | | |
| item_id | STRING | true | | | |
| category_id | STRING | true | | | |
| behavior | STRING | true | | | |
| ts | TIMESTAMP(3) *ROWTIME* | true | | | `ts` - INTERVAL '5' SECOND |
| proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | AS PROCTIME() | |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
6 rows in set
1、窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES));
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[1 min]).
Please use window table-valued function with aggregate together using window_start and window_end as group keys.
-- 按照 window_start、window_end 分区
SELECT window_start, window_end, count(user_id)
FROM TABLE(TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
3.2、滑动窗口(HOP)
相对于滚动窗口,滑动窗口多一个 slide
参数,用于表示每个窗口滑动大小
滑动窗口函数表达式:
HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data 表
- timecol 时间字段
- slide 滑动长度
- size 窗口大小
测试案例:
一个滑动窗口,窗口长度1分钟,滑动距离30秒
SELECT window_start, window_end, count(1)
FROM TABLE(
HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
3.3、累计窗口(CUMULATE )
Cumulate window 就是累计窗口,简单来说,以下图里面时间轴上的一个区间为窗口步长(step)。
第一个 window 统计的是一个区间的数据;
第二个 window 统计的是第一区间和第二个区间的数据;
第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。
累积计算在业务场景中非常常见,如累积 UV 场景。在 UV 大盘曲线中:我们每隔 10 分钟统计一次当天累积用户 UV。
累计窗口表达式:
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data 表
- timecol 时间字段
- step 步长
- size 窗口大小
每30秒,统一最近一分钟的pv
SELECT window_start, window_end, count(1)
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
GROUP BY window_start, window_end;
累加窗口:(1 MINUTE,1 DAY) 按照1分钟划分窗口,每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
按照 订单数据事件时间+水位线 进行窗口触发执行
SELECT window_start, window_end, count(1)
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' DAYS))
GROUP BY window_start, window_end;
以上是关于flink sql clinet 实战:窗口函数----flink-1.13.6的主要内容,如果未能解决你的问题,请参考以下文章
flink sql clinet 实战:upsert kafka connector -- flink-1.12
Flink SQL 分组窗口函数 Group Window 实战
Flink SQL 窗口表值函数 Window TVF 实战