Flink MySQL CDC 使用总结

Posted 董可伦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink MySQL CDC 使用总结相关的知识,希望对你有一定的参考价值。

前言

学习总结Flink mysql CDC,主要目的是同步MySQL数据至其他数据源如Hudi、MySQL等,本文主要以 MySQL2Hudi、MySQL2MySQL两个场景进行示例验证。

版本

Flink版本
Flink1.14.3、1.15.4、1.16.1
Hudi0.13.0
MYSQL CDC2.3.0

安装

将下面的Jar包拷贝到flink/lib下面 (以flink1.15.4为例)

Flink CDC,只是对于Source表,比如MySQL CDC,就是抽取MySQL Source表,CDC 官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#,可以查看官方文档了解目前Flink CDC支持哪些数据源,每一种数据源都需要下载对应的Jar包

MySQL CDC 参数

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '19.168.44.128',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root-123',
    'database-name' = 'cdc',
    'table-name' = 'mysql_cdc_source'
);

要使用MySQL CDC Source首先要开启MySQL binlog日志,其他参数和详细信息可以查看官方文档:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html#id6

示例

创建MySQL Source物理表

mysql -uroot -proot-123 cdc
CREATE TABLE `mysql_cdc_source` (
  `id` int(11) NOT NULL,
  `name` text,
  `price` double DEFAULT NULL,
  `ts` int(11) DEFAULT NULL,
  `dt` text,
  `insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

造数

insert into mysql_cdc_source(id,name,price,ts,dt) values (1,'hudi1',1.1,1000,'20230331');
insert into mysql_cdc_source(id,name,price,ts,dt) values (2,'hudi2',2.2,2000,'20230331');
......

CDC MySQL2Hudi

set yarn.application.name=cdc_mysql2hudi;

set parallelism.default=1;
set taskmanager.memory.process.size=3g;


set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2hudi;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '19.168.44.128',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root-123',
    'database-name' = 'cdc',
    'table-name' = 'mysql_cdc_source'
);

CREATE TABLE hudi_cdc_sink (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/cdc/hudi_cdc_sink',
  'write.operation'='upsert', --写类型,可选
  'write.tasks'='1', --并行度,可选,需要传参
  'table.type'='COPY_ON_WRITE', --表类型,可选
  'precombine.field' = 'ts', --可选,预合并字段和历史比较字段,当新来的数据该字段大于历史值时才会更新,默认为ts(如果有这个ts字段的话),需要传参,没有可不填,建议将该值设置为update_time
  'hoodie.datasource.write.recordkey.field' = 'id', -- 可选,和primary key效果一样,二者至少选一个
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', --该参数目前版本有bug
  'index.type' =  'BUCKET', -- flink只支持两种index,默认state index,默认state index对于数据量比较大的情况会因为tm内存不足导致GC OOM
  'hoodie.bucket.index.num.buckets' = '16', -- 桶数
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'cdc',
  'hive_sync.table' = 'hudi_cdc_sink',
  'hoodie.datasource.hive_sync.create_managed_table' = 'true' --是否为内部表,0.13.0版本开始支持
);

insert into hudi_cdc_sink select * from mysql_cdc_source;

注意,要求source表和sink表字段顺序要对应

CDC MySQL2Mysql

创建MySQL Sink物理表

CREATE TABLE `test_sink_mysql` (
  `id` int(11) NOT NULL,
  `name` text,
  `price` double DEFAULT NULL,
  `ts` int(11) DEFAULT NULL,
  `dt` text,
  `insert_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
set yarn.application.name=cdc_mysql2mysql;

set parallelism.default=1;
set taskmanager.memory.process.size=3g;

set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;

CREATE TABLE mysql_cdc_source (
  id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致
  name string,
  price double,
  ts bigint,
  dt string
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '19.168.44.128',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root-123',
    'database-name' = 'cdc',
    'table-name' = 'mysql_cdc_source'
);

create table test_sink_mysql (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price double,
  ts bigint,
  dt string
) with (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true',
 'username' = 'root',
 'password' = 'root-123',
 'table-name' = 'test_sink_mysql',
 'sink.buffer-flush.max-rows' = '1000000'
);

insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

验证

1、对源表mysql_cdc_source执行insert/update/delete操作,查看目标表数据同步情况,数据实时同步且一致
2、找个比较大的source表,在历史数据同步中间过程中,kill掉任务,利用checkpoint恢复任务,验证全量数据的断点续传
3、对源表执行truncate操作,目标表数据不会同步truncate
4、其他验证…

相关阅读

Doris通过Flink CDC接入MySQL实战

1. 创建MySQL库表,写入demo数据

  1. 登录测试MySQL
 mysql -u root -pnew_password
  1. 创建MySQL库表,写入demo数据
CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

注意:MySQL需要开通bin-log

  • log_bin=mysql_bin
  • binlog-format=Row
  • server-id=1

2. 创建Doris库表

  1. 创建Doris表
mysql -uroot -P9030 -h127.0.0.1
create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

3. 启动Flink

  1. 启动flink
cd /mnt/apps/flink-1.15.3/ 
#启动flink,这里服务已经启动
bin/start-cluster.sh 
#进入SQL控制台
bin/sql-client.sh embedded
  1. 创建Flink 任务:
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'new_password',
    'database-name' = 'emp_1',
    'table-name' = 'employees_1'
  );

CREATE TABLE cdc_doris_sink (
    emp_no       int ,
    birth_date   STRING,
    first_name   STRING,
    last_name    STRING,
    gender       STRING,
    hire_date    STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '172.16.64.9:8030',
  'table.identifier' = 'demo.all_employees_info',
  'username' = 'root',
  'password' = '',
  'sink.properties.two_phase_commit'='true',
  'sink.label-prefix'='doris_demo_emp_002'
);

insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date) 
select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date  from employees_source;
  1. 输入如下地址,查看flink任务
    http://localhost:8081/#/job/running

  2. 数据验证:启动后可以看到有数据实时进入Doris了

mysql -uroot -P9030 -h127.0.0.1
mysql> select * from all_employees_info;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|  10001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
|  10002 | 1964-06-02 | Bezalel    | Simmel    | F      | 1985-11-21 |
|  10036 | 1959-08-10 | Adamantios | Portugali | M      | 1992-01-03 |
|  20001 | 1953-09-02 | Georgi     | Facello   | M      | 1986-06-26 |
+--------+------------+------------+-----------+--------+------------+
4 rows in set (0.02 sec)

Link

  • https://zhuanlan.zhihu.com/p/532913664
  • https://www.runoob.com/mysql/mysql-install.html
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/

Jar包地址:

flink 环境:1.15.3

  • https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz
    解压并将jar包防止在Flink 的lib下
    flink-doris-connector:1.15
  • https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/flink-doris-connector-1.15-1.2.1.jar
    cdc mysql:flink-sql-connector-mysql-cdc-2.2.1.jar
  • https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

以上是关于Flink MySQL CDC 使用总结的主要内容,如果未能解决你的问题,请参考以下文章

使用Flink CDC 2.2.1进行ETL-Oracle-MySQL

Flink CDC 读取MySQL的数据

flink cdc MySQL2Doris 案例分享

flink cdc读取mysql导入hudi

Doris通过Flink CDC接入MySQL实战

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi