FlinkCDC部署
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkCDC部署相关的知识,希望对你有一定的参考价值。
文章目录
Flink安装
1、解压
wget -b https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar -zxf flink-1.13.6-bin-scala_2.12.tgz
mv flink-1.13.6 /opt/module/flink
2、环境变量
vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
3、分发环境变量
source ~/bin/source.sh
4、Per-Job-Cluster时报错:Exception in thread “Thread-5” java.lang.IllegalStateException:
Trying to access closed classloader.
Please check if you store classloaders directly or indirectly in static fields.
If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately,
you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
对此,编辑配置文件
vim /opt/module/flink/conf/flink-conf.yaml
在配置文件添加下面这行,可解决上面报错
classloader.check-leaked-classloader: false
5、下载 flink-sql-connector-kafka 和 fastjson1.2.83
的jar(去Maven官网找链接)
cd /opt/module/flink/lib
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar
wget https://repo1.maven.org/maven2/com/alibaba/fastjson/1.2.83/fastjson-1.2.83.jar
job部署
1、测试代码
package org.example;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Objects;
public class TestCDC
public static void main(String[] args) throws Exception
//TODO 1 创建流处理环境,设置并行度
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//TODO 2 创建Flink-MySQL-CDC数据源
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop107")
.port(3306)
.username("root")
.password("密码")
.databaseList("db1") //设置要捕获的库
.tableList("db1.t") //设置要捕获的表(库不能省略)
.deserializer(new JsonDebeziumDeserializationSchema()) //将接收到的SourceRecord反序列化为JSON字符串
.startupOptions(StartupOptions.initial()) //启动策略:监视的数据库表执行初始快照,并继续读取最新的binlog
.build();
//TODO 3 读取数据并打印
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "sourceName")
.map(JSONObject::parseObject).map(Objects::toString)
.addSink(new FlinkKafkaProducer<>("hadoop105:9092", "topic01", new SimpleStringSchema()));
//TODO 4 执行
env.execute();
2、打包插件
服务器上已有的jar,就不需打包,加
<scope>provided</scope>
flink-connector-mysql-cdc
和flink-table-api-java-bridge
需要打包上
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
<!-- FlinkCDC -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>provided</scope>
</dependency>
<!-- Flink_Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.6</version>
<scope>provided</scope>
</dependency>
</dependencies>
<!-- 打包插件 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3、打包
上传
jar-with-dependencies
的jar到服务器
4、测试
kafka-console-consumer.sh --bootstrap-server hadoop105:9092 --topic topic01
/opt/module/flink/bin/flink run-application \\
-t yarn-application \\
-nm a3 \\
-ys 2 \\
-yjm 2048 \\
-ytm 4096 \\
-c org.example.TestCDC \\
FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar
测试结果JSON格式一览
数据库的操作 | op | before | after |
---|---|---|---|
insert | c | null | 行数据 |
update | 先d后c | 行数据 | 行数据 |
delete | d | 行数据 | null |
库名和表名在
source
1、对监视的数据库表执行初始快照
-- 建库
DROP DATABASE IF EXISTS db1;CREATE DATABASE db1;
-- 建表
CREATE TABLE db1.t(a INT PRIMARY KEY,b TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
-- 插入
INSERT db1.t(a,b) VALUES (1,'2022-10-24 00:00:00');
JSON
"before": null,
"after":
"a": 1,
"b": "2022-10-24T00:00:00Z"
,
"source":
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1670656489808,
"snapshot": "false",
"db": "db1",
"sequence": null,
"table": "t",
"server_id": 0,
"gtid": null,
"file": "",
"pos": 0,
"row": 0,
"thread": null,
"query": null
,
"op": "r",
"ts_ms": 1670656489815,
"transaction": null
2、插入数据
INSERT db1.t(a) VALUES (2);
JSON
"before": null,
"after":
"a": 2,
"b": "2022-12-10T07:15:52Z"
,
"source":
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1670656552000,
"snapshot": "false",
"db": "db1",
"sequence": null,
"table": "t",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5152,
"row": 0,
"thread": null,
"query": null
,
"op": "c",
"ts_ms": 1670656552743,
"transaction": null
3、更新数据
UPDATE db1.t SET a=3 WHERE a=1;
SELECT * FROM db1.t;
JSON
"before":
"a": 1,
"b": "2022-10-23T16:00:00Z"
,
"after": null,
"source":
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1670656602000,
"snapshot": "false",
"db": "db1",
"sequence": null,
"table": "t",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5434,
"row": 0,
"thread": null,
"query": null
,
"op": "d",
"ts_ms": 1670656602253,
"transaction": null
"before": null,
"after":
"a": 3,
"b": "2022-10-23T16:00:00Z"
,
"source":
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1670656602000,
"snapshot": "false",
"db": "db1",
"sequence": null,
"table": "t",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5434,
"row": 0,
"thread": null,
"query": null
,
"op": "c",
"ts_ms": 1670656602253,
"transaction": null
4、删除数据
DELETE FROM db1.t WHERE a=3;
JSON
"before":
"a": 3,
"b": "2022-10-23T16:00:00Z"
,
"after": null,
"source":
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1670656744000,
"snapshot": "false",
"db": "db1",
"sequence": null,
"table": "t",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5717,
"row": 0,
"thread": null,
"query": null
,
"op": "d",
"ts_ms": 1670656744059,
"transaction": null
以上是关于FlinkCDC部署的主要内容,如果未能解决你的问题,请参考以下文章