flink doris batch案例
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink doris batch案例相关的知识,希望对你有一定的参考价值。
案例业务说明
批处理,查询doris 结果还写入doris,使用flink-doris-connector
版本信息
flink 1.14.4
flink-doris-connector 1.1.0
所有依赖
根据官网,列出了所有的依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12</scala.version>
<java.version>1.8</java.version>
<flink.version>1.14.4</flink.version>
<fastjson.version>1.2.62</fastjson.version>
<hadoop.version>2.8.3</hadoop.version>
<scope.mode>compile</scope.mode>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>$fastjson.version</version>
</dependency>
<!-- Add log dependencies when debugging locally -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
<!-- flink-doris-connector -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.1</version>
<exclusions>
<exclusion>
<artifactId>flink-shaded-guava</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<args>
<arg>-feature</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
案例实现
source表结构
Field Type
endTime DATE
cid INT Yes
pid INT Yes
machineId INT
programCode VARCHAR(50)
yieldNum INT
guid VARCHAR(255)
简单业务要求:
根据 cid, pid, machineId, programCode 聚合 yieldNum
因此 sink表结构
Field Type
machineId INT
cid INT Yes
pid INT Yes
totalYield INT
endTime DATE
programCode VARCHAR(50)
直接上代码吧,有注释
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.enableCheckpointing(10000);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// register a table in the catalog
// doris source table
tEnv.executeSql(
"CREATE TABLE cdc_test_source (\\n" +
" endTime date,\\n" +
" cid INT,\\n" +
" pid INT,\\n" +
" machineId INT,\\n" +
" programCode STRING,\\n" +
" yieldNum INT,\\n" +
" guid STRING\\n" +
") WITH (\\n" +
" 'connector' = 'doris',\\n" +
" 'fenodes' = '172.8.10.xxx:8030',\\n" +
" 'username' = 'xxx',\\n" +
" 'password' = 'xxx',\\n" +
" 'table.identifier' = 'db.table1'\\n" +
")");
//doris sink table
tEnv.executeSql(
"CREATE TABLE doris_test_sink (" +
" machineId INT,\\n" +
" cid INT,\\n" +
" pid INT,\\n" +
" totalYield INT,\\n" +
" endTime date,\\n" +
" programCode STRING\\n" +
") " +
"WITH (\\n" +
" 'connector' = 'doris',\\n" +
" 'fenodes' = '172.8.10.xxx:8030',\\n" +
" 'table.identifier' = 'db.table2',\\n" +
" 'username' = 'xxx',\\n" +
" 'password' = 'xxx',\\n" +
/* doris stream load label, In the exactly-once scenario,
the label is globally unique and must be restarted from the latest checkpoint when restarting.
Exactly-once semantics can be turned off via sink.enable-2pc. */
" 'sink.label-prefix' = 'doris_label',\\n" +
" 'sink.properties.format' = 'json',\\n" + //json data format
" 'sink.properties.read_json_by_line' = 'true'\\n" +
")");
//insert into doris sink table
// tEnv.executeSql(" select machineId, cid, pid, sum(yieldNum) as totalYield,'2022-06-07' as endTime, programCode from cdc_test_source where machineId > 129490 and endTime = '2022-06-07' group by cid, pid, machineId, programCode ").print();
tEnv.executeSql("insert into doris_test_sink select machineId, cid, pid, sum(yieldNum) as totalYield, cast('2022-07-14' as date)as endTime, programCode from cdc_test_source where machineId > 129490 and endTime = '2022-07-14' group by cid, pid, machineId, programCode ");
异常
Caused by: org.apache.doris.flink.exception.DorisException: Load status is Label Already Exists and load job finished, change you label prefix or restore from latest savepoint!
'sink.label-prefix' 需要每次不同,若存在则视为已有的任务,可以做成随机值 如 UUID
" 'sink.label-prefix' ='" + UUID.randomUUID() + "',\\n" +
以上是关于flink doris batch案例的主要内容,如果未能解决你的问题,请参考以下文章