flink doris batch案例

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink doris batch案例相关的知识,希望对你有一定的参考价值。

案例业务说明

批处理,查询doris 结果还写入doris,使用flink-doris-connector

版本信息

flink 1.14.4

flink-doris-connector 1.1.0

所有依赖

根据官网,列出了所有的依赖

   <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12</scala.version>
        <java.version>1.8</java.version>
        <flink.version>1.14.4</flink.version>
        <fastjson.version>1.2.62</fastjson.version>
        <hadoop.version>2.8.3</hadoop.version>
        <scope.mode>compile</scope.mode>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>
        <!-- Add log dependencies when debugging locally -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <!-- flink-doris-connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.1</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-shaded-guava</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-feature</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

案例实现

source表结构

Field	Type	
endTime	DATE	
cid	INT	Yes			
pid	INT	Yes			
machineId	INT	
programCode	VARCHAR(50)	
yieldNum	INT	
guid	VARCHAR(255)

简单业务要求:

根据 cid, pid, machineId, programCode 聚合 yieldNum

因此 sink表结构

Field	Type	
machineId	INT	
cid	INT	Yes
pid	INT	Yes			
totalYield	INT	
endTime	DATE
programCode	VARCHAR(50)

直接上代码吧,有注释

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // register a table in the catalog
        // doris source table
        tEnv.executeSql(
                "CREATE TABLE cdc_test_source (\\n" +
                        "  endTime date,\\n" +
                        "  cid INT,\\n" +
                        "  pid INT,\\n" +
                        "  machineId INT,\\n" +
                        "  programCode STRING,\\n" +
                        "  yieldNum INT,\\n" +
                        "  guid STRING\\n" +
                        ") WITH (\\n" +
                        "  'connector' = 'doris',\\n" +
                        "  'fenodes' = '172.8.10.xxx:8030',\\n" +
                        "  'username' = 'xxx',\\n" +
                        "  'password' = 'xxx',\\n" +
                        "  'table.identifier' = 'db.table1'\\n" +
                        ")");


        //doris sink table
        tEnv.executeSql(
                "CREATE TABLE doris_test_sink (" +
                        "  machineId INT,\\n" +
                        "  cid INT,\\n" +
                        "  pid INT,\\n" +
                        "  totalYield INT,\\n" +
                        "  endTime date,\\n" +
                        "  programCode STRING\\n" +
                        ") " +
                        "WITH (\\n" +
                        "  'connector' = 'doris',\\n" +
                        "  'fenodes' = '172.8.10.xxx:8030',\\n" +
                        "  'table.identifier' = 'db.table2',\\n" +
                        "  'username' = 'xxx',\\n" +
                        "  'password' = 'xxx',\\n" +
                /* doris stream load label, In the exactly-once scenario,
                   the label is globally unique and must be restarted from the latest checkpoint when restarting.
                   Exactly-once semantics can be turned off via sink.enable-2pc. */
                        "  'sink.label-prefix' = 'doris_label',\\n" +
                        "  'sink.properties.format' = 'json',\\n" +       //json data format
                        "  'sink.properties.read_json_by_line' = 'true'\\n" +
                        ")");

        //insert into  doris sink table

//        tEnv.executeSql(" select  machineId, cid, pid,  sum(yieldNum) as totalYield,'2022-06-07' as endTime, programCode  from cdc_test_source where machineId > 129490 and endTime = '2022-06-07' group by cid, pid, machineId, programCode ").print();
        tEnv.executeSql("insert into doris_test_sink  select  machineId, cid, pid,  sum(yieldNum) as totalYield, cast('2022-07-14' as date)as endTime, programCode  from cdc_test_source where machineId > 129490 and endTime = '2022-07-14' group by cid, pid, machineId, programCode ");

异常

Caused by: org.apache.doris.flink.exception.DorisException: Load status is Label Already Exists and load job finished, change you label prefix or restore from latest savepoint!

'sink.label-prefix' 需要每次不同,若存在则视为已有的任务,可以做成随机值 如 UUID

"  'sink.label-prefix' ='" + UUID.randomUUID() + "',\\n" +

以上是关于flink doris batch案例的主要内容,如果未能解决你的问题,请参考以下文章

flink doris batch案例

Flink sink doris案例

Flink sink doris案例

Flink sink doris案例

flink cdc MySQL2Doris 案例分享

flink cdc MySQL2Doris 案例分享 解决分库多表同步