flinksql 实时查询hudi 的数据
Posted wudl5566
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinksql 实时查询hudi 的数据相关的知识,希望对你有一定的参考价值。
1.版本
组件 | 版本 |
---|---|
hudi | 10.0 |
flink | 13.5 |
2. 场景:
在flink 中新建一张表(t1)插入数据, 然后同时用过另外一张表进行查询(t2)
场景如图
3. t1 建表
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/hudi-t1',
'write.tasks' = '1',
'compaction.tasks' = '1',
'table.type' = 'MERGE_ON_READ'
);
3.1 插入数据
INSERT INTO t1 VALUES('id1','Danny',28,TIMESTAMP '1970-01-01 00:00:01','par1');
insert into t1 values ('id9','Danny',18,TIMESTAMP'1970-01-01 00:00:01','par9');
INSERT INTO t1 VALUES
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
4.另外开一个窗口创建表t2
CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://192.168.1.161:8020/hudi-warehouse/hudi-t1',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'read.streaming.enabled' = 'true', -- this option enable the streaming read
'read.streaming.start-commit' = '20210316134557', -- specifies the start commit instant time
'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);
5. 在t1 表插入数据 在t2表中会实时增加
以上是关于flinksql 实时查询hudi 的数据的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)并且自动同步数据到 Hive
Flink 实战系列Flink SQL 实时同步 Kafka 数据到 Hudi(parquet + snappy)
FusionInsight MRS Flink DataStream API读写Hudi实践