flinksql source doris案例

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinksql source doris案例相关的知识,希望对你有一定的参考价值。

上一篇文章使用doris作为sink,
为了简便,直接采用上一篇文章的worker表作为source
上一篇传送门

首先创建环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

然后创建flink doris table 映射表
需要注意的是:
doris的数据类型和flink支持的类型,不完全一致

tEnv.executeSql("create  table doris_worker(\\n" +
                " `startTime` String  ,\\n" +
                " `id` int ,\\n" +
                " `name` String,\\n" +
                "  `age` int ,\\n" +
                "  `city` String ,\\n" +
                "  `salary` int \\n" +
                ")with(\\n" +
                "'connector' = 'doris',\\n" +
                "'fenodes' = 'xxx.xxx.xxx.xxx.xxx:8030',\\n" +
                " 'table.identifier'='db.worker',\\n" +
                "   'sink.batch.size' = '2',\\n" +
                "    'username' = 'username',\\n" +
                "    'password' = 'password'\\n" +
                ")");

最后执行sql语句即可

tEnv.executeSql("select * from doris_worker where id >2").print();

这里需要注意的是
tEnv.executeSql() 执行了sql之后,不需要再去执行 env.execute(“”);
若添加会提示:

 No operators defined in streaming topology. Cannot execute.

实践之后,才知道这么简单。

以上是关于flinksql source doris案例的主要内容,如果未能解决你的问题,请参考以下文章

flinksql source doris案例

flinksql source doris案例

flink cdc MySQL2Doris 案例分享

flink doris batch案例

flink doris batch案例

flink doris batch案例