如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库

Posted

技术标签:

【中文标题】如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库【英文标题】:How to replicate all changes from source to destination db using debezium and confluent-sink-connector running on docker 【发布时间】:2021-01-30 19:41:56 【问题描述】:

下面的代码是我的 Kafka-connect-JDBC 和 mysql-driver 的 Dockerfile

FROM debezium/connect:1.3
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ENV MYSQL_DRIVER_VERSION 8.0.20
ARG KAFKA_JDBC_VERSION=5.5.0
RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-$MYSQL_DRIVER_VERSION.tar.gz" \
    | tar -xzf - -C /kafka/libs --strip-components=1 mysql-connector-java-8.0.20/mysql-connector-java-$MYSQL_DRIVER_VERSION.jar
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
docker build . --tag kafka kafka-connect-sink 

下面是我的源数据库 json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '
    "name": "inventory-connector",
    "config": 
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.inventory"
    
'

下面是我的目标 db sink json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '
    "name": "inventory-connector-sink",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.customers",
        "table.name.format": "pk.customers",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
    
'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '
    "name": "inventory-connector-sink-addresses",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.addresses",
        "table.name.format": "pk.addresses",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
    
'

使用此配置,我需要订阅每个主题,但问题是我有 100 多个表要在目标数据库中复制,无论如何我可以在单个 json 配置中完成它,以便我可以订阅所有主题。

【问题讨论】:

【参考方案1】:

您可以使用topics(或topics.regex)属性定义要使用的主题列表,并使用JBDC Sink 连接器的table.name.format 属性或RegexRouter SMT(或组合它们)来覆盖目标表名称:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.99.102:8083/connectors/ -d '
    "name": "inventory-connector-sink-addresses",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://192.168.0.104:3306/pk?useSSL=false",
        "connection.user": "pavan",
        "connection.password": "root",
        "topics": "dbserver1.inventory.addresses,dbserver1.inventory.customers",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "",
        "pk.mode": "record_key",
        "transforms": "route,unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite",

        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "pk.$3"
    
'

【讨论】:

您好,感谢您的解决方案,但每个表都有不同的主键名称,它仅适用于主键名称为“id”的表假设如果客户表具有 cust_seq 作为主键,则会引发错误跨度> 将 pk.fields 留空。在这种情况下,将使用 key 结构中的所有字段

以上是关于如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Github-actions 在 Docker 上使用 Mysql 运行 Prisma 迁移

如何使用 Dockerfile 在启动 docker 容器上运行 jboss-cli

如何使用 docker 和 nginx 在同一台服务器上运行多个站点?

如何在Window 10上安装Docker

如何从 Docker 容器内部访问在 WSL2 上运行的服务?

Docker-如何使用 syslog 在主机上记录日志?