Flink SQL --维表join

Posted 宝哥大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink SQL --维表join相关的知识,希望对你有一定的参考价值。

文章目录


本文基于flink-1.13.6

一、维表 join 介绍

维表是数仓中的一个概念,维表中的维度属性是观察数据的角度,在建设离线数仓的时候,通常是将维表与事实表进行关联构建星型模型。在实时数仓中,同样也有维表与事实表的概念,其中事实表通常存储在kafka中,维表通常存储在外部设备中(比如mysql,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表JOIN时,需指明这条记录关联维表快照的时刻。需要注意是,目前Flink SQL的维表JOIN仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表rowtime所对应的的维表快照(事件时间语义)

二、Temporal Table Join

使用语法

SELECT column-names
FROM table1  [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1

注意:目前,仅支持INNER JOIN与LEFT JOIN。在join的时候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime处理时间属性(计算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示当左边表的记录与右边的维表join时,只匹配当前处理时间维表所对应的的快照数据。

样例

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

使用说明
仅支持Blink planner
仅支持SQL,目前不支持Table API
目前不支持基于事件时间(event time)的temporal table join
维表可能会不断变化,JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化
维表和维表不能进行JOIN
维表必须指定主键。维表JOIN时,ON的条件必须包含所有主键的等值条件

三、维表Join案例

3.1、背景

Kafka中有一份用户行为数据,包括pv,buy,cart,fav行为;MySQL中有一份省份区域的维表数据。现将两种表进行JOIN,统计每个区域的购买行为数量。

3.2、实践

3.2.1、维表存储在MySQL中

-- mysql
CREATE TABLE `dim_province` (
  `province_id` bigint(20) DEFAULT NULL,
  `province_name` varchar(50) DEFAULT NULL,
  `region_name` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


insert into dim_province (province_id, province_name, region_name) 
values 
(1, "山东", "华东"),
(2, "广东", "华南"),
(3, "河南", "华中"),
(4, "北京", "华北"),
(5, "新疆", "西北");

-- flinksql  维度表
CREATE TABLE dim_province (
    province_id BIGINT,  -- 省份id
    province_name  VARCHAR, -- 省份名称
 region_name VARCHAR -- 区域名称
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
    'connector.table' = 'dim_province',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

注意: 加上useUnicode=true&characterEncoding=UTF-8,否则 flinksql 写到 mysql 产生乱码

3.2.2、事实数据存在 kafka

事实表存储在kafka中,数据为用户点击行为,格式为csv,具体数据样例如下:

1,1002,10002,fav,2022-10-27 16:25:00,2
1,1004,10002,cart,2022-10-27 16:25:01,3
6,1004,10004,pv,2022-10-27 16:25:01,3
3,1002,10001,cart,2022-10-27 16:25:01,1
4,1001,10004,fav,2022-10-27 16:25:01,4

创建kafka数据源表,如下:

CREATE TABLE user_behavior (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    `ts` timestamp(3),
	 province_id INT,   -- 用户所在的省份id
	`proctime` as PROCTIME(),   -- 处理时间列
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列
) WITH (
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'latest-offset', -- 从起始 offset 开始读取
	'properties.bootstrap.servers' = 'chb1:9092',
	'properties.group.id' = 'testGroup',
	'format' = 'csv'
);

3.2.3、创建MySQL的结果表,表示区域销量

-- mysql
 CREATE TABLE top_region (
    region_name varchar(50),  -- 区域名称
    buy_cnt BIGINT  -- 销量
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- flinksql
CREATE TABLE region_sales_sink (
    region_name STRING,  -- 区域名称
    buy_cnt BIGINT,  -- 销量
	proctime as PROCTIME()
) WITH (

    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://chb1:3306/chb_test?useUnicode=true&characterEncoding=UTF-8',
    'connector.table' = 'top_region', -- MySQL中的待插入数据的表
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.write.flush.interval' = '1s'
);

3.2.4、用户行为数据与省份维表数据 join

CREATE VIEW user_behavior_detail AS
SELECT
  u.user_id, 
  u.item_id,
  u.category_id,
  u.behavior,  
  p.province_name,
  p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province_id = p.province_id;

3.2.5、计算区域的销量,并将计算结果写入MySQL

-- 结果
INSERT INTO region_sales_sink
SELECT 
  region_name,
  COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE behavior = 'buy'
GROUP BY region_name;

参考:
Flink SQL— CREATE语句
Flink Temporal Join Versioned Table Demo

以上是关于Flink SQL --维表join的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习之flink sql

Flink学习之flink sql

flink1.12.1扩展flink-sql 支持写入到sqlserver

Flink学习:Flink Table/Sql API

解析和校验Flink SQL语句

解析和校验Flink SQL语句