FlinkSQL--时态表或版本表(Temporal Tables 或 Versioned Tables)
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkSQL--时态表或版本表(Temporal Tables 或 Versioned Tables)相关的知识,希望对你有一定的参考价值。
文章目录
一、概念
时态表(Temporal Table)是一张随时间变化的表 – 在 Flink 中称为动态表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动态的)。
时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。
版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表 和 普通表。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
二、设计初衷
2.1、关联一张版本表
以订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。
SELECT * FROM product_changelog;
(changelog kind) update_time product_id product_name price
================= =========== ========== ============ =====
+(INSERT) 00:01:00 p_001 scooter 11.11
+(INSERT) 00:02:00 p_002 basketball 23.11
-(UPDATE_BEFORE) 12:00:00 p_001 scooter 11.11
+(UPDATE_AFTER) 12:00:00 p_001 scooter 12.99
-(UPDATE_BEFORE) 12:00:00 p_002 basketball 23.11
+(UPDATE_AFTER) 12:00:00 p_002 basketball 19.99
-(DELETE) 18:00:00 p_001 scooter 12.99
表 product_changelog 表示数据库表 products不断增长的 changelog, 比如,产品 scooter 在时间点 00:01:00的初始价格是 11.11, 在 12:00:00 的时候涨价到了 12.99, 在 18:00:00 的时候这条产品价格记录被删除。
如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:
update_time product_id product_name price
=========== ========== ============ =====
00:01:00 p_001 scooter 11.11
00:02:00 p_002 basketball 23.11
如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
2.2、关联一张普通表
另一方面,某些用户案列需要连接变化的维表,该表是外部数据库表。
假设 LatestRates 是一个物化的最新汇率表 (比如:一张 HBase 表),LatestRates 总是表示 HBase 表 Rates 的最新内容。
我们在 10:15:00 时查询到的内容如下所示:
10:15:00 > SELECT * FROM LatestRates;
currency rate
========= ====
US Dollar 102
Euro 114
Yen 1
我们在 11:00:00 时查询到的内容如下所示:
11:00:00 > SELECT * FROM LatestRates;
currency rate
========= ====
US Dollar 102
Euro 116
Yen 1
三、时态表
注意 仅 Blink planner 支持此功能。
Flink 使用主键约束和事件时间来定义一张版本表和版本视图。
3.1、声明版本表
create table currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
PRIMARY KEY (currency) NOT ENFORCED, -- 1、定义主键
WATERMARK FOR update_time AS update_time -- 2、事件时间
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/ratesHistory.csv',
'format.type' = 'csv'
)
3.2、声明版本视图
-- 定义一张 append-only 表
create table RatesHistory (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
-- PRIMARY KEY (currency) NOT ENFORCED,
WATERMARK FOR update_time AS update_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/ratesHistory.csv',
'format.type' = 'csv'
);
为了在 RatesHistory 上定义版本表,Flink 支持通过去重查询定义版本视图, 去重查询可以产出一个有序的 changelog 流,去重查询能够推断主键并保留原始数据流的事件时间属性。
create view versioned_rates AS
SELECT currency,conversion_rate,update_time FROM -- (1) `currency_time` 保留了事件时间
(SELECT *,
row_number() over(partition by currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键
ORDER BY update_time DESC) AS row_num
FROM currency_rates)
WHERE row_num=1;
行 (1)
保留了事件时间作为视图 versioned_rates
的事件时间,
行 (2)
使得视图 versioned_rates
有了主键, 因此视图 versioned_rates
是一个版本视图。
视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。
3.3、声明普通表
没有设置主键
create table orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/rateOrder.csv',
'format.type' = 'csv'
);
四、时态表函数
4.1、定义时态表函数
五、案例
5.1、
如果流延迟过来的数据要跟之前的维表数据做关联,即根据流的事件时间,查找某个时间点的维度数据而不是当前维度表数据。
比如这样一个场景:用户的订单表和和商品维度表,将维度表设置成时态表,这样用户就可以根据订单表中的下单时间Join下单时的商品当时最新的维度数据
样例数据
数据源:ratesHistory.csv
RMB,114,2015-01-01 00:00:00
RMB,115,2015-01-03 00:00:00
RMB,116,2015-01-19 00:00:00
Euro,119,2015-01-03 00:00:00
USD,99,2015-01-03 00:00:00
USD,100,2015-01-03 00:00:00
Euro,118,2015-01-03 00:00:00
数据源:rateOrder.csv
1,29,RMB,2015-01-02 00:00:00
2,19,RMB,2015-01-03 00:00:00
3,33,RMB,2015-01-11 00:00:00
4,55,RMB,2015-01-21 00:00:00
create table currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
PRIMARY KEY (currency) NOT ENFORCED,
WATERMARK FOR update_time AS update_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/ratesHistory.csv',
'format.type' = 'csv'
)
create table orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
'connector.type' = 'filesystem',
'connector.path' = '/tmp/rateOrder.csv',
'format.type' = 'csv'
);
SELECT order_id, price, orders.currency, conversion_rate, order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time -- Event Time Temporal Join
ON orders.currency = currency_rates.currency;
参考:
https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/streaming/versioned_tables.html
https://www.jianshu.com/p/733a53bcb9b9
以上是关于FlinkSQL--时态表或版本表(Temporal Tables 或 Versioned Tables)的主要内容,如果未能解决你的问题,请参考以下文章
1.19.5.3.时态表关联一张版本表关联一张普通表时态表声明版本表声明版本视图声明普通表时态表函数等
在 Entity Framework Core 中查询系统版本时态表中的数据