Flink SQL --维表join
Posted 宝哥大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL --维表join相关的知识,希望对你有一定的参考价值。
文章目录
本文基于flink-1.13.6
一、维表 join 介绍
维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如mysql,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)
二、Temporal Table Join
使用语法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。
样例
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
使用说明
仅支持Blink planner
仅支持SQL,目前不支持Table API
目前不支持基于事件时间(event time)的temporal table join
维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
维表和维表不能进行JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件
三、维表Join案例
3.1、背景
Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。
3.2、实践
3.2.1、维表存储在MySQL中
-- mysql
CREATE TABLE `dim_province` (
`province_id` bigint(20) DEFAULT NULL,
`province_name` varchar(50) DEFAULT NULL,
`region_name` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into dim_province (province_id, province_name, region_name)
values
(1, "山东", "华东"),
(2, "广东", "华南"),
(3, "河南", "华中"),
(4, "北京", "华北"),
(5, "新疆", "西北");
-- flinksql 维度表
CREATE TABLE dim_province (
province_id BIGINT, -- 省份id
province_name VARCHAR, -- 省份名称
region_name VARCHAR -- 区域名称
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
'connector.table' = 'dim_province',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
注意: 加上useUnicode=true&characterEncoding=UTF-8
,否则 flinksql 写到 mysql 产生乱码
3.2.2、事实数据存在 kafka
事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:
1,1002,10002,fav,2022-10-27 16:25:00,2
1,1004,10002,cart,2022-10-27 16:25:01,3
6,1004,10004,pv,2022-10-27 16:25:01,3
3,1002,10001,cart,2022-10-27 16:25:01,1
4,1001,10004,fav,2022-10-27 16:25:01,4
创建kafka数据源表,如下:
CREATE TABLE user_behavior (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
`ts` timestamp(3),
province_id INT, -- 用户所在的省份id
`proctime` as PROCTIME(), -- 处理时间列
WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列
) WITH (
'connector' = 'kafka', -- 使用 kafka connector
'topic' = 'user_behavior', -- kafka topic
'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
'properties.bootstrap.servers' = 'chb1:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv'
);
3.2.3、创建MySQL的结果表,表示区域销量
-- mysql
CREATE TABLE top_region (
region_name varchar(50), -- 区域名称
buy_cnt BIGINT -- 销量
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- flinksql
CREATE TABLE region_sales_sink (
region_name STRING, -- 区域名称
buy_cnt BIGINT, -- 销量
proctime as PROCTIME()
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
'connector.table' = 'top_region', -- MySQL中的待插入数据的表
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.interval' = '1s'
);
3.2.4、用户行为数据与省份维表数据 join
CREATE VIEW user_behavior_detail AS
SELECT
u.user_id,
u.item_id,
u.category_id,
u.behavior,
p.province_name,
p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province_id = p.province_id;
3.2.5、计算区域的销量,并将计算结果写入MySQL
-- 结果
INSERT INTO region_sales_sink
SELECT
region_name,
COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE behavior = 'buy'
GROUP BY region_name;
参考:
Flink SQL— CREATE语句
Flink Temporal Join Versioned Table Demo
以上是关于Flink SQL --维表join的主要内容,如果未能解决你的问题,请参考以下文章