Doris和Mysql的批同步测试

Posted 骚戴

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Doris和Mysql的批同步测试相关的知识,希望对你有一定的参考价值。

Doris和mysql测试结果

同步类型

更新表(列级)

更新表(行级)

追加表

覆盖表

Doris->Mysql

支持

支持

支持

支持

Mysql->Doris

支持

不支持

不支持

不支持

Mysql->Doris不支持更新表(行级)、追加表、覆盖表的原因:Mysql的load data的语法和doris的语法有区别,需要适配!

Doris的Load Data的语法

mysql-load: 使用MySql客户端导入本地数据

该语句用于向指定的 table 导入数据,与普通Load区别是,这种导入方式是同步导入。

这种导入方式仍然能够保证一批导入任务的原子性,要么全部数据导入成功,要么全部失败。

  1. MySQL Load以语法LOAD DATA开头, 无须指定LABEL

  1. 指定LOCAL表示读取客户端文件.不指定表示读取FE服务端本地文件. 导入FE本地文件的功能默认是关闭的, 需要在FE节点上设置mysql_load_server_secure_path来指定安全路径, 才能打开该功能.

  1. INFILE内填写本地文件路径, 可以是相对路径, 也可以是绝对路径.目前只支持单个文件, 不支持多个文件

  1. INTO TABLE的表名可以指定数据库名, 如案例所示. 也可以省略, 则会使用当前用户所在的数据库.

  1. PARTITION语法支持指定分区导入

  1. COLUMNS TERMINATED BY指定列分隔符

  1. LINES TERMINATED BY指定行分隔符

  1. IGNORE num LINES用户跳过CSV的表头, 可以跳过任意行数. 该语法也可以用IGNORE num ROWS代替

  1. 列映射语法, 具体参数详见导入的数据转换 的列映射章节

  1. PROPERTIES参数配置, 详见下文

Doris的官网:https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/MYSQL-LOAD

大数据平台的Load data的语法

LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' 
INTO TABLE `temp_e4d256bf87398d45` FIELDS TERMINATED BY '' 
OPTIONALLY ENCLOSED BY '' 
ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);

Mysql->Doris不支持更新表(行级)的报错信息:

2023/03/24 10:30:29 - unique_test_output.0 - Starting the null bulk Load in a separate thread - LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `temp_e4d256bf87398d45` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:30:29 - unique_test_output.0 - Opening fifo /tmp/5e5c2e727e2e409a8d5ec25d156ca0cf for writing.
2023/03/24 10:30:29 - unique_test_output.0 - Make sure user has been granted the FILE privilege.
2023/03/24 10:30:29 - unique_test_output.0 - 
2023/03/24 10:30:29 - unique_test_output.0 - Error in step, asking everyone to stop because of:
2023/03/24 10:30:29 - unique_test_output.0 - org.pentaho.di.core.exception.KettleException: 
2023/03/24 10:30:29 - unique_test_output.0 - org.pentaho.di.core.exception.KettleDatabaseException: 
2023/03/24 10:30:29 - unique_test_output.0 - Couldn't execute SQL: LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `temp_e4d256bf87398d45` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:30:29 - unique_test_output.0 - 
2023/03/24 10:30:29 - unique_test_output.0 - errCode = 2, detailMessage = Syntax error in line 1:
2023/03/24 10:30:29 - unique_test_output.0 - LOAD DATA LOCAL INFILE '/tmp/5e5c...
2023/03/24 10:30:29 - unique_test_output.0 -      ^
2023/03/24 10:30:29 - unique_test_output.0 - Encountered: DATA
2023/03/24 10:30:29 - unique_test_output.0 - Expected
2023/03/24 10:30:29 - unique_test_output.0 - 
2023/03/24 10:30:29 - unique_test_output.0 - 
2023/03/24 10:30:29 - unique_test_output.0 - 

Mysql->Doris不支持追加表的报错信息:

2023/03/24 10:32:00 - unique_test_output.0 - Starting the null bulk Load in a separate thread - LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `temp_e4d256bf87398d45` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:32:00 - unique_test_output.0 - Opening fifo /tmp/5e5c2e727e2e409a8d5ec25d156ca0cf for writing.
2023/03/24 10:32:00 - unique_test_output.0 - Make sure user has been granted the FILE privilege.
2023/03/24 10:32:00 - unique_test_output.0 - 
2023/03/24 10:32:00 - unique_test_output.0 - Error in step, asking everyone to stop because of:
2023/03/24 10:32:00 - unique_test_output.0 - org.pentaho.di.core.exception.KettleException: 
2023/03/24 10:32:00 - unique_test_output.0 - org.pentaho.di.core.exception.KettleDatabaseException: 
2023/03/24 10:32:00 - unique_test_output.0 - Couldn't execute SQL: LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `temp_e4d256bf87398d45` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:32:00 - unique_test_output.0 - 
2023/03/24 10:32:00 - unique_test_output.0 - errCode = 2, detailMessage = Syntax error in line 1:
2023/03/24 10:32:00 - unique_test_output.0 - LOAD DATA LOCAL INFILE '/tmp/5e5c...
2023/03/24 10:32:00 - unique_test_output.0 -      ^
2023/03/24 10:32:00 - unique_test_output.0 - Encountered: DATA
2023/03/24 10:32:00 - unique_test_output.0 - Expected
2023/03/24 10:32:00 - unique_test_output.0 - 
2023/03/24 10:32:00 - unique_test_output.0 - 
2023/03/24 10:32:00 - unique_test_output.0 - 

Mysql->Doris不支持覆盖表的报错信息:

2023/03/24 10:32:47 - unique_test_output.0 - Starting the null bulk Load in a separate thread - LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `unique_test` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:32:47 - unique_test_output.0 - Opening fifo /tmp/5e5c2e727e2e409a8d5ec25d156ca0cf for writing.
2023/03/24 10:32:48 - unique_test_output.0 - Make sure user has been granted the FILE privilege.
2023/03/24 10:32:48 - unique_test_output.0 - 
2023/03/24 10:32:48 - unique_test_output.0 - Error in step, asking everyone to stop because of:
2023/03/24 10:32:48 - unique_test_output.0 - org.pentaho.di.core.exception.KettleException: 
2023/03/24 10:32:48 - unique_test_output.0 - org.pentaho.di.core.exception.KettleDatabaseException: 
2023/03/24 10:32:48 - unique_test_output.0 - Couldn't execute SQL: LOAD DATA LOCAL INFILE '/tmp/5e5c2e727e2e409a8d5ec25d156ca0cf' INTO TABLE `unique_test` FIELDS TERMINATED BY '' OPTIONALLY ENCLOSED BY '' ESCAPED BY '' (`user_id`,`date`,`city`,`age`,`sex`);
2023/03/24 10:32:48 - unique_test_output.0 - 
2023/03/24 10:32:48 - unique_test_output.0 - errCode = 2, detailMessage = Syntax error in line 1:
2023/03/24 10:32:48 - unique_test_output.0 - LOAD DATA LOCAL INFILE '/tmp/5e5c...
2023/03/24 10:32:48 - unique_test_output.0 -      ^
2023/03/24 10:32:48 - unique_test_output.0 - Encountered: DATA
2023/03/24 10:32:48 - unique_test_output.0 - Expected
2023/03/24 10:32:48 - unique_test_output.0 - 
2023/03/24 10:32:48 - unique_test_output.0 - 
2023/03/24 10:32:48 - unique_test_output.0 - 

flink cdc MySQL2Doris 案例分享 解决分库多表同步

案例简单说明

使用flink cdc,完成mysql 多库 多表同时同步到doris中

版本信息

flink 1.14.4

doris 1.1.0

依赖

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12</scala.version>
        <java.version>1.8</java.version>
        <flink.version>1.14.4</flink.version>
        <fastjson.version>1.2.62</fastjson.version>
        <hadoop.version>2.8.3</hadoop.version>
        <scope.mode>compile</scope.mode>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>$fastjson.version</version>
        </dependency>
        <!-- Add log dependencies when debugging locally -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
        </dependency>
        <!-- flink-doris-connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>flink-shaded-guava</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_$scala.version</artifactId>
            <version>$flink.version</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <args>
                        <arg>-feature</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

flink-connector-mysql-cdc 2.2.1版本 一直会报异常

java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder

从官网下载依赖,然后本地添加进去flink-sql-connector-mysql-cdc-2.2.0

准备mysql数据

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

CREATE TABLE employees_2 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');


CREATE DATABASE emp_2;

USE emp_2;

CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);


INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');


CREATE TABLE employees_2(
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');

准备doris表

create database test_db;
use test_db;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date,
    database_name varchar(50),
    table_name    varchar(200)
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

由于 UNIQUE KEY(emp_no, birth_date),因此update mysql这两个字段的时候,doris 会多一条数据

代码案例

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(10000);
        env.setParallelism(1);
        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        // register a table in the catalog
        tEnv.executeSql(
                "CREATE TABLE cdc_test_source (\\n" +
                        "  emp_no INT,\\n" +
                        "  birth_date DATE,\\n" +
                        "  first_name STRING,\\n" +
                        "  last_name STRING,\\n" +
                        "  gender STRING,\\n" +
                        "  hire_date  STRING,\\n" +
                        "  database_name STRING METADATA VIRTUAL,\\n" +
                        "  table_name STRING METADATA VIRTUAL,\\n" +
                        "  PRIMARY KEY (`emp_no`) NOT ENFORCED  \\n" +
                        ") WITH (\\n" +
                        "  'connector' = 'mysql-cdc',\\n" +
                        "  'hostname' = '192.168.22.xxx',\\n" +
                        "  'port' = '3306',\\n" +
                        "  'username' = 'xxx',\\n" +
                        "  'password' = 'xxx',\\n" +
                        "  'database-name' = 'emp_[0-9]+',\\n" +
                        "  'table-name' = 'employees_[0-9]+'\\n" +
                        ")");

        String label = UUID.randomUUID();
        //doris table
        tEnv.executeSql(
                "CREATE TABLE doris_test_sink (" +
                        "  emp_no INT,\\n" +
                        "  birth_date STRING,\\n" +
                        "  first_name STRING,\\n" +
                        "  last_name STRING,\\n" +
                        "  gender STRING,\\n" +
                        "  hire_date  STRING\\n" +
                        ") " +
                        "WITH (\\n" +
                        "  'connector' = 'doris',\\n" +
                        "  'fenodes' = '172.8.10.xxx:8030',\\n" +
                        "  'table.identifier' = 'test_db.all_employees_info',\\n" +
                        "  'username' = 'xxx',\\n" +
                        "  'password' = 'xxx',\\n" +
                /* doris stream load label, In the exactly-once scenario,
                   the label is globally unique and must be restarted from the latest checkpoint when restarting.
                   Exactly-once semantics can be turned off via sink.enable-2pc. */
                        "  'sink.label-prefix' ='" + label + "',\\n" +
                        "  'sink.properties.format' = 'json',\\n" +       //json data format
                        "  'sink.properties.read_json_by_line' = 'true'\\n" +
                        ")");

        //insert into mysql table to doris table
        tEnv.executeSql("INSERT INTO doris_test_sink select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date   from cdc_test_source ");

执行之后可以再插入、更新一些数据,进行验证

补充

多库多表在于

" ‘database-name’ = ‘emp_[0-9]+’,\\n" +
" ‘table-name’ = ‘employees_[0-9]+’\\n" +

使用正则,可以自动匹配上

库、表 的名字在于
" database_name STRING METADATA VIRTUAL,\\n" +
" table_name STRING METADATA VIRTUAL,\\n" +
如果没写,是读取不到mysql数据中库、表的名字的

Doris 和 Flink 列类型映射关系 可以查看官网信息

Flink Doris Connector - Apache Doris

以上是关于Doris和Mysql的批同步测试的主要内容,如果未能解决你的问题,请参考以下文章

flink cdc MySQL2Doris 案例分享 解决分库多表同步

Doris通过Flink CDC接入MySQL实战

flink cdc MySQL2Doris 案例分享

高效的MySQL的批插入 BULK INSERT

基于“Doris”的type2拉链表的Mysql实现

基于“Doris”的type2拉链表的Mysql实现