大数据(9j)FlinkCDC
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9j)FlinkCDC相关的知识,希望对你有一定的参考价值。
文章目录
CDC概述
- Change Data Capture【捕捉变更的数据】
监测并捕获数据库的变动(数据或表的增删改…),按发生的顺序 完整地写到消息中间件
CDC的种类 | 基于查询 | 基于 Binlog |
---|---|---|
工具 | Sqoop | Canal、Maxwell、Debezium |
处理模式 | 批 | 流 |
能否捕获所有变化 | 否 | 能 |
延迟 | 高 | 低 |
是否增加数据库压力 | 是 | 否 |
Flink-CDC
- Flink社区开发并开源了
flink-cdc-connectors
组件
可直接从mysql、PostgreSQL等数据库 读取全量和增量变化数据
Flink-CDC代码测试
1、开启MySQL8 Binlog
1、编辑MySQL配置
vim /etc/my.cnf
2、添加如下内容
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=db1
参数 | 说明 |
---|---|
server-id | MySQL主从复制时,主从之间每个实例都有独一无二的ID |
log-bin | 生成的日志文件的前缀 |
binlog_format | binlog格式 |
binlog-do-db | 指定哪些库需要写到binlog;如果不配置,就会是所有库 |
binlog_format 参数值 | 参数值说明 | 空间占用 | 数据一致性 |
---|---|---|---|
statement | 语句级:记录每次一执行写操作的语句 | 小 | 如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致 |
row | 行级:记录每次操作后每行记录的变化 | 大 | 绝对支持 |
mixed | statement 的升级版 | 小 | 极端情况下仍会造成数据不一致 |
3、重启MySQL
sudo systemctl restart mysqld
4、检测配置是否成功
mysql -uroot -e'SHOW variables LIKE "%log_bin%"' -p
2、MySQL数据准备
1、被监控的MySQL数据
DROP DATABASE IF EXISTS db1;
CREATE DATABASE db1;
CREATE TABLE db1.t
(a INT PRIMARY KEY,b VARCHAR(255),c TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db1.t(a,b,c) VALUES (2,'ab','2022-10-24 00:00:00');
2、插入数据(测试时才执行)
INSERT db1.t(a,b) VALUES (3,'bc');
UPDATE db1.t SET a=2,b='cd' WHERE a=2;
SELECT * FROM db1.t;
3、准备开发环境
WIN10+JDK1.8+IDEA2021+创建Maven项目;pom.xml
添加依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- FlinkCDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<!-- FlinkSQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- flink-table相关依赖,可解决下面报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.source.reader.RecordEmitter -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.19</version>
</dependency>
</dependencies>
4、Java代码
4.1、DataStream式的示例
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkStreamCDC
public static void main(String[] args) throws Exception
//TODO 1 创建流处理环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2 创建Flink-MySQL-CDC数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("主机地址")
.port(3306)
.username("root")
.password("密码")
//设置要捕获的库
.databaseList("db1")
//设置要捕获的表(库不能省略)
.tableList("db1.t")
//将接收到的SourceRecord反序列化为JSON字符串
.deserializer(new JsonDebeziumDeserializationSchema())
//启动策略
// initial:对监视的数据库表执行初始快照,并继续读取最新的binlog
// earliest:从binlog的开头读取数据
// latest:从binlog的末尾读取数据
// 还有specificOffset和timestamp,具体看源码即可……
.startupOptions(StartupOptions.initial())
.build();
//TODO 3 读取数据并打印
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "sourceName").print();
//TODO 4 执行
env.execute();
测试结果打印
"before":null,"after":"a":2,"b":"ab","c":"2022-10-24T00:00:00Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993379274,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null,"op":"r","ts_ms":1669993379280,"transaction":null
//执行DML后
"before":null,"after":"a":3,"b":"bc","c":"2022-12-02T15:09:55Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1851,"row":0,"thread":null,"query":null,"op":"c","ts_ms":1669993794980,"transaction":null
"before":"a":2,"b":"ab","c":"2022-10-23T16:00:00Z","after":"a":2,"b":"cd","c":"2022-10-23T16:00:00Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2145,"row":0,"thread":null,"query":null,"op":"u","ts_ms":1669993794987,"transaction":null
4.2、FlinkSQL示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSqlCdc
public static void main(String[] args) throws Exception
// TODO 1. 准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 创建动态表
tableEnv.executeSql("CREATE TABLE t (" +
" a INT," +
" b STRING," +
" c TIMESTAMP," +
" PRIMARY KEY(a) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = '主机地址'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '密码'," +
" 'database-name' = 'db1'," +
" 'table-name' = 't'" +
")");
tableEnv.executeSql("SELECT * FROM t").print();
// TODO 3. 执行
env.execute();
测试结果打印
+----+----+-----+----------------------------+
| op | a | b | c |
+----+----+-----+----------------------------+
| +I | 2 | ab | 2022-10-24 00:00:00.000000 |
//执行DML后
| +I | 3 | bc | 2022-12-02 15:15:14.000000 |
| -U | 2 | ab | 2022-10-23 16:00:00.000000 |
| +U | 2 | cd | 2022-10-23 16:00:00.000000 |
测试结果JSON格式一览(StartupOptions.initial)
数据库的操作 | op | before | after |
---|---|---|---|
insert | c | null | 行数据 |
update | 先d后c | 行数据 | 行数据 |
delete | d | 行数据 | null |
1、对监视的数据库表执行初始快照
-- 建库
DROP DATABASE IF EXISTS db1;CREATE DATABASE db1;
-- 建表
CREATE TABLE db1.t(a INT PRIMARY KEY,b TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
-- 插入
INSERT db1.t(a,b) VALUES (1,'2022-10-24 00:00:00');
JSON
"before": null,
"after":
"a": 1,
以上是关于大数据(9j)FlinkCDC的主要内容,如果未能解决你的问题,请参考以下文章
通过FlinkCDC将MySQL中变更的数据写入到kafka
FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版