flink sql clinet 实战:窗口函数----flink-1.13.6

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink sql clinet 实战:窗口函数----flink-1.13.6相关的知识,希望对你有一定的参考价值。

文章目录

1、造模拟数据

2、创建kafka表

详细的kafka connector 详见官网

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    `ts` TIMESTAMP(3),
	proctime as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
	'properties.bootstrap.servers' = '10.106.100.228:6667',
	'properties.group.id' = 'testGroup',
	'format' = 'csv'
);

三、窗口函数 Windowing table-valued functions (Windowing TVFs)

官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/

窗口函数分类:

  • Tumble Windows
  • Hop Windows
  • Cumulate Windows
  • Session Windows (will be supported soon)

窗口 TVF 的返回值是一个新的关系,它包括原关系的所有列,以及另外3个列“window_start”、“window_end”、“window_time”,以指示分配的窗口。

3.1、滚动窗口 (TUMBLE)

每个窗口没有重叠,窗口长度固定

滚动窗口的函数格式:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)
- data 表名
- timecol 时间属性字段
- 时间窗口的大小  INTERVAL '10' MINUTES

测试案例
表结构

Flink SQL> desc user_log;
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
|        name |                        type |  null | key |        extras |                  watermark |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
|     user_id |                      STRING |  true |     |               |                            |
|     item_id |                      STRING |  true |     |               |                            |
| category_id |                      STRING |  true |     |               |                            |
|    behavior |                      STRING |  true |     |               |                            |
|          ts |      TIMESTAMP(3) *ROWTIME* |  true |     |               | `ts` - INTERVAL '5' SECOND |
|    proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false |     | AS PROCTIME() |                            |
+-------------+-----------------------------+-------+-----+---------------+----------------------------+
6 rows in set

1、窗口函数不可以单独使用,需要聚合函数,按照 window_start、window_end 分区

Flink SQL>  SELECT * FROM TABLE(
   TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES));
   
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[1 min]).
 Please use window table-valued function with aggregate together using window_start and window_end as group keys.


-- 按照 window_start、window_end 分区

SELECT window_start, window_end, count(user_id) 
	FROM TABLE(TUMBLE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '1' MINUTES))
	GROUP BY window_start, window_end;

3.2、滑动窗口(HOP)

相对于滚动窗口,滑动窗口多一个 slide 参数,用于表示每个窗口滑动大小

滑动窗口函数表达式:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
- data- timecol 时间字段
- slide 滑动长度
- size 窗口大小

测试案例:
一个滑动窗口,窗口长度1分钟,滑动距离30秒

SELECT window_start, window_end, count(1)
  FROM TABLE(
    HOP(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;

3.3、累计窗口(CUMULATE )

Cumulate window 就是累计窗口,简单来说,以下图里面时间轴上的一个区间为窗口步长(step)。
第一个 window 统计的是一个区间的数据;

第二个 window 统计的是第一区间和第二个区间的数据;

第三个 window 统计的是第一区间,第二个区间和第三个区间的数据。

累积计算在业务场景中非常常见,如累积 UV 场景。在 UV 大盘曲线中:我们每隔 10 分钟统计一次当天累积用户 UV。

累计窗口表达式:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
- data- timecol 时间字段
- step 步长
- size 窗口大小

每30秒,统一最近一分钟的pv

SELECT window_start, window_end, count(1)
  FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' MINUTES))
  GROUP BY window_start, window_end;

累加窗口:(1 MINUTE,1 DAY) 按照1分钟划分窗口,每分钟计算当前分钟的数据 merge 当前分钟的前一分钟的数据结果
按照 订单数据事件时间+水位线 进行窗口触发执行

SELECT window_start, window_end, count(1)
  FROM TABLE(
    CUMULATE(TABLE user_log, DESCRIPTOR(ts), INTERVAL '30' SECONDS, INTERVAL '1' DAYS))
  GROUP BY window_start, window_end;

以上是关于flink sql clinet 实战:窗口函数----flink-1.13.6的主要内容,如果未能解决你的问题,请参考以下文章

flink sql clinet 实战:upsert kafka connector -- flink-1.12

Flink SQL 分组窗口函数 Group Window 实战

Flink SQL 窗口表值函数 Window TVF 实战

Flink实战系列Flink SQL 之 Session Window 的用法

Flink处理函数实战之四:窗口处理

Flink SQL Client综合实战