Flink CDC 读取MySQL的数据
Posted 一个肉团子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink CDC 读取MySQL的数据相关的知识,希望对你有一定的参考价值。
1、前提背景准备
Flink在1.11之后就已经支持从mysql增量读取Binlog日志的方式。
pom文件如下:
<properties> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.12</scala.version> <flink.version>1.12.0</flink.version> <fastjson.verson>1.2.72</fastjson.verson> <lombok.version>1.18.6</lombok.version> <kafka.version>2.3.0</kafka.version> </properties> <dependencies> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>0.21.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>$kafka.version</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>$flink.version</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>$lombok.version</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>$fastjson.verson</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>$flink.version</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_$scala.binary.version</artifactId> <version>$flink.version</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>force-shading</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_$scala.binary.version</artifactId> <version>$flink.version</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_$scala.binary.version</artifactId> <version>$flink.version</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_$scala.binary.version</artifactId> <version>$flink.version</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
2、全量读取某个数据库中的所有库中的所有表的Binlog方式代码如下:
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String>
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create"))
opstr = "insert";
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null)
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data)
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
@Override
public TypeInformation<String> getProducedType()
return TypeInformation.of(String.class);
3、全量读取某个数据库指定DB中的所有表
可以在build之前 ,添加一个
databaseList,用来指定特定的DB
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.databaseList("horse") // 指定某个特定的库
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String>
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create"))
opstr = "insert";
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null)
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data)
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
@Override
public TypeInformation<String> getProducedType()
return TypeInformation.of(String.class);
4、全量读取某个数据库指定DB中的指定表
可以在build之前 ,添加一个
tableList,用来指定特定的DB中的特定表
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
// 配置 Debezium在初始化快照的时候(扫描历史数据的时候) =》 不要锁表
properties.setProperty("debezium.snapshot.locking.mode", "none");
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//最大同时存在的ck数 和设置的间隔时间有一个就行
checkpointConfig.setMaxConcurrentCheckpoints(1);
//超时时间
checkpointConfig.setCheckpointTimeout(TimeUnit.SECONDS.toMillis(5));
//2.3 指定从CK自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 5000L));
//2.4 设置任务关闭的时候保留最后一次CK数据
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DebeziumSourceFunction<String> MysqlSource = MySQLSource.<String>builder()
.hostname("100.21.112.11")
.port(3306)
.deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
.username("root")
.password("xxxx")
.databaseList("horse") // 指定某个特定的库
.tableList("horse.t_dri_info") //指定特定的表
.startupOptions(StartupOptions.latest())// 读取binlog策略 这个启动选项有五种
.debeziumProperties(properties) //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
.build();
/*
* .startupOptions(StartupOptions.latest()) 参数配置
* 1.initial() 全量扫描并且继续读取最新的binlog 最佳实践是第一次使用这个
* 2.earliest() 从binlog的开头开始读取 就是啥时候开的binlog就从啥时候读
* 3.latest() 从最新的binlog开始读取
* 4.specificOffset(String specificOffsetFile, int specificOffsetPos) 指定offset读取
* 5.timestamp(long startupTimestampMillis) 指定时间戳读取
* */
env.addSource(MysqlSource).print();
env.execute("flink-cdc");
public static class MyDeserializationSchema implements DebeziumDeserializationSchema<String>
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception
Struct value = (Struct) sourceRecord.value();
Struct after = value.getStruct("after");
Struct source = value.getStruct("source");
String db = source.getString("db");//库名
String table = source.getString("table");//表名
//获取操作类型 直接将参数穿进去 会自己解析出来 里面是个enum对应每个操作
/* READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d");*/
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opstr = operation.toString().toLowerCase();
//类型修正 会把insert识别成create
if (opstr.equals("create"))
opstr = "insert";
//获取after结构体里面的表数据,封装成json输出
JSONObject json1 = new JSONObject();
JSONObject json2 = new JSONObject();
//加个判空
if (after != null)
List<Field> data = after.schema().fields(); //获取结构体
for (Field field : data)
String name = field.name(); //结构体的名字
Object value2 = after.get(field);//结构体的字段值
//放进json2里面去 json2放到json1里面去
json2.put(name, value2);
//整理成大json串输出
json1.put("db", db);
json1.put("table", table);
json1.put("data", json2);
json1.put("type", opstr);
collector.collect(json1.toJSONString());
@Override
public TypeInformation<String> getProducedType()
return TypeInformation.of(String.class);
以上是关于Flink CDC 读取MySQL的数据的主要内容,如果未能解决你的问题,请参考以下文章
flink实时读取mongodb方案调研-实现mongodb cdc
flink实时读取mongodb方案调研-实现mongodb cdc