不使用 Kafka Connect 复制架构更改

Posted

技术标签:

【中文标题】不使用 Kafka Connect 复制架构更改【英文标题】:Schema Changes Not Replicating with Kafka Connect 【发布时间】:2021-08-19 04:45:54 【问题描述】:

目标是:mysql -> Kafka -> MySQL。接收器目标应与生产保持同步。

插入和删除记录很好,但我遇到了架构更改问题,例如删除的列。更改不会复制到接收器目标。

我的来源:


    "name": "hub-connector",
    "config": 
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "42",
        "database.server.name": "study",
        "database.include.list": "companyHub",
        "database.history.kafka.bootstrap.servers": "broker:29092",
        "database.history.kafka.topic": "dbhistory.study",
        "include.schema.changes": "true"
    


我的水槽


    "name": "companyHub-sink",
    "config": 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:mysql://172.18.141.102:3306/Hub",
        "connection.user": "user",
        "connection.password": "passaword",
        "topics": "study.companyHub.countryTaxes, study.companyHub.addresses",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key",
        "transforms": "dropPrefix, unwrap",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"study.companyHub.(.*)$",
        "transforms.dropPrefix.replacement":"$1",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.delete.handling.mode": "rewrite"
    

任何帮助将不胜感激!

【问题讨论】:

【参考方案1】:

自动创造和自动进化 提示

确保 JDBC 用户具有适当的 DDL 权限。

如果启用了 auto.create,如果发现目标表丢失,连接器可以创建目标表。由于连接器使用记录模式作为表定义的基础,因此创建在线进行,记录从主题中使用。主键是根据键配置设置指定的。

如果启用了 auto.evolve,则连接器可以在遇到发现缺少列的记录时通过在目标表上发出 ALTER 来执行有限的自动进化。由于数据类型的更改和列的删除可能很危险,因此连接器不会尝试对表执行此类演变。也没有尝试添加主键约束。相反,如果 auto.evolve 被禁用,则不会执行任何进化,并且连接器任务会失败并出现错误,指出缺少列。

对于自动创建和自动演化,列的可空性基于架构中相应字段的可选性,并且如果适用,还根据相应字段的默认值指定默认值。我们使用以下从 Connect 模式类型到数据库特定类型的映射:

架构类型 MySQL Oracle PostgreSQL SQLite SQL Server Vertica

此处未提及的数据库不支持自动创建或自动进化。

重要

对于向后兼容的表架构演变,记录架构中的新字段必须是可选的或具有默认值。如果您需要删除字段,则应手动更改表架构以删除相应的列、为其分配默认值或使其可为空。

【讨论】:

“由于数据类型更改和列删除可能很危险,因此连接器不会尝试在表上执行此类演变” 谢谢...你是说不可能吗? 根据 jdbc sink 连接器文档,不支持更改 drop 列,虽然我发现以下文档似乎试图解决这个问题,但请注意由您来处理ddl的变化。 debezium.io/documentation/reference/connectors/… ... 架构更改主题

以上是关于不使用 Kafka Connect 复制架构更改的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题

Kafka Connect:将消息从字节转换为 Json

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

Kafka Connect JDBC 接收器连接器不起作用

使用 Kafka HDFS Connect 写入 HDFS 时出错