flinksql source doris案例
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinksql source doris案例相关的知识,希望对你有一定的参考价值。
上一篇文章使用doris作为sink,
为了简便,直接采用上一篇文章的worker表作为source
上一篇传送门
首先创建环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
然后创建flink doris table 映射表
需要注意的是:
doris的数据类型和flink支持的类型,不完全一致
tEnv.executeSql("create table doris_worker(\\n" +
" `startTime` String ,\\n" +
" `id` int ,\\n" +
" `name` String,\\n" +
" `age` int ,\\n" +
" `city` String ,\\n" +
" `salary` int \\n" +
")with(\\n" +
"'connector' = 'doris',\\n" +
"'fenodes' = 'xxx.xxx.xxx.xxx.xxx:8030',\\n" +
" 'table.identifier'='db.worker',\\n" +
" 'sink.batch.size' = '2',\\n" +
" 'username' = 'username',\\n" +
" 'password' = 'password'\\n" +
")");
最后执行sql语句即可
tEnv.executeSql("select * from doris_worker where id >2").print();
这里需要注意的是
tEnv.executeSql() 执行了sql之后,不需要再去执行 env.execute(“”);
若添加会提示:
No operators defined in streaming topology. Cannot execute.
实践之后,才知道这么简单。
以上是关于flinksql source doris案例的主要内容,如果未能解决你的问题,请参考以下文章