大数据(9j)FlinkCDC

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据(9j)FlinkCDC相关的知识,希望对你有一定的参考价值。

文章目录

CDC概述

  • Change Data Capture【捕捉变更的数据】
    监测并捕获数据库的变动(数据或表的增删改…),按发生的顺序 完整地写到消息中间件
CDC的种类基于查询基于 Binlog
工具SqoopCanal、Maxwell、Debezium
处理模式
能否捕获所有变化
延迟
是否增加数据库压力

Flink-CDC

  • Flink社区开发并开源了flink-cdc-connectors组件
    可直接从mysql、PostgreSQL等数据库 读取全量和增量变化数据
Maxwell Flink-CDC 比较Maxwell和Flink-CDC方案 MySQL之Binlog Kafka Flink MySQL之Binlog Flink

Flink-CDC代码测试

1、开启MySQL8 Binlog

1、编辑MySQL配置

vim /etc/my.cnf

2、添加如下内容

server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=db1
参数说明
server-idMySQL主从复制时,主从之间每个实例都有独一无二的ID
log-bin生成的日志文件的前缀
binlog_formatbinlog格式
binlog-do-db指定哪些库需要写到binlog;如果不配置,就会是所有库
binlog_format参数值参数值说明空间占用数据一致性
statement语句级:记录每次一执行写操作的语句如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致
row行级:记录每次操作后每行记录的变化绝对支持
mixedstatement的升级版极端情况下仍会造成数据不一致

3、重启MySQL

sudo systemctl restart mysqld

4、检测配置是否成功

mysql -uroot -e'SHOW variables LIKE "%log_bin%"' -p

2、MySQL数据准备

1、被监控的MySQL数据

DROP DATABASE IF EXISTS db1;
CREATE DATABASE db1;
CREATE TABLE db1.t
(a INT PRIMARY KEY,b VARCHAR(255),c TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
INSERT db1.t(a,b,c) VALUES (2,'ab','2022-10-24 00:00:00');

2、插入数据(测试时才执行)

INSERT db1.t(a,b) VALUES (3,'bc');
UPDATE db1.t SET a=2,b='cd' WHERE a=2;
SELECT * FROM db1.t;

3、准备开发环境

WIN10+JDK1.8+IDEA2021+创建Maven项目;pom.xml添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.13.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.6</version>
    </dependency>
    <!-- FlinkCDC -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.1.0</version>
    </dependency>
    <!-- FlinkSQL -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.13.6</version>
    </dependency>
    <!-- flink-table相关依赖,可解决下面报错:
    Caused by: java.lang.ClassNotFoundException:
    org.apache.flink.connector.base.source.reader.RecordEmitter -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>
        <version>1.13.6</version>
    </dependency>
    <!-- JSON处理 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.19</version>
    </dependency>
</dependencies>

4、Java代码

4.1、DataStream式的示例

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkStreamCDC 
    public static void main(String[] args) throws Exception 
        //TODO 1 创建流处理环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //TODO 2 创建Flink-MySQL-CDC数据源
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("主机地址")
                .port(3306)
                .username("root")
                .password("密码")
                //设置要捕获的库
                .databaseList("db1")
                //设置要捕获的表(库不能省略)
                .tableList("db1.t")
                //将接收到的SourceRecord反序列化为JSON字符串
                .deserializer(new JsonDebeziumDeserializationSchema())
                //启动策略
                // initial:对监视的数据库表执行初始快照,并继续读取最新的binlog
                // earliest:从binlog的开头读取数据
                // latest:从binlog的末尾读取数据
                // 还有specificOffset和timestamp,具体看源码即可……
                .startupOptions(StartupOptions.initial())
                .build();
        //TODO 3 读取数据并打印
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "sourceName").print();
        //TODO 4 执行
        env.execute();
    

测试结果打印

"before":null,"after":"a":2,"b":"ab","c":"2022-10-24T00:00:00Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993379274,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null,"op":"r","ts_ms":1669993379280,"transaction":null

//执行DML后

"before":null,"after":"a":3,"b":"bc","c":"2022-12-02T15:09:55Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1851,"row":0,"thread":null,"query":null,"op":"c","ts_ms":1669993794980,"transaction":null
"before":"a":2,"b":"ab","c":"2022-10-23T16:00:00Z","after":"a":2,"b":"cd","c":"2022-10-23T16:00:00Z","source":"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2145,"row":0,"thread":null,"query":null,"op":"u","ts_ms":1669993794987,"transaction":null

4.2、FlinkSQL示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkSqlCdc 
    public static void main(String[] args) throws Exception 
        // TODO 1. 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // TODO 2. 创建动态表
        tableEnv.executeSql("CREATE TABLE t (" +
                " a INT," +
                " b STRING," +
                " c TIMESTAMP," +
                " PRIMARY KEY(a) NOT ENFORCED" +
                ") WITH (" +
                " 'connector' = 'mysql-cdc'," +
                " 'hostname' = '主机地址'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '密码'," +
                " 'database-name' = 'db1'," +
                " 'table-name' = 't'" +
                ")");
        tableEnv.executeSql("SELECT * FROM t").print();
        // TODO 3. 执行
        env.execute();
    

测试结果打印

+----+----+-----+----------------------------+
| op |  a |   b |                          c |
+----+----+-----+----------------------------+
| +I |  2 |  ab | 2022-10-24 00:00:00.000000 |

//执行DML后

| +I |  3 |  bc | 2022-12-02 15:15:14.000000 |
| -U |  2 |  ab | 2022-10-23 16:00:00.000000 |
| +U |  2 |  cd | 2022-10-23 16:00:00.000000 |

测试结果JSON格式一览(StartupOptions.initial)

数据库的操作opbeforeafter
insertcnull行数据
update先d后c行数据行数据
deleted行数据null

1、对监视的数据库表执行初始快照

-- 建库
DROP DATABASE IF EXISTS db1;CREATE DATABASE db1;
-- 建表
CREATE TABLE db1.t(a INT PRIMARY KEY,b TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
-- 插入
INSERT db1.t(a,b) VALUES (1,'2022-10-24 00:00:00');

JSON


	"before": null,
	"after": 
		"a": 1,
		以上是关于大数据(9j)FlinkCDC的主要内容,如果未能解决你的问题,请参考以下文章

通过FlinkCDC将MySQL中变更的数据写入到kafka

FlinkCDC从Mongodb同步数据至elasticsearch(ES) 新版

使用Binlog+FlinkCDC实时监控数据

FlinkCdc--Debezium实现Kafka实时监控mysql binlog日志

FlinkCDC

FlinkCDC部署