不使用 Kafka Connect 复制架构更改
Posted
技术标签:
【中文标题】不使用 Kafka Connect 复制架构更改【英文标题】:Schema Changes Not Replicating with Kafka Connect 【发布时间】:2021-08-19 04:45:54 【问题描述】:目标是:mysql -> Kafka -> MySQL。接收器目标应与生产保持同步。
插入和删除记录很好,但我遇到了架构更改问题,例如删除的列。更改不会复制到接收器目标。
我的来源:
"name": "hub-connector",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "study",
"database.include.list": "companyHub",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "dbhistory.study",
"include.schema.changes": "true"
我的水槽
"name": "companyHub-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://172.18.141.102:3306/Hub",
"connection.user": "user",
"connection.password": "passaword",
"topics": "study.companyHub.countryTaxes, study.companyHub.addresses",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key",
"transforms": "dropPrefix, unwrap",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"study.companyHub.(.*)$",
"transforms.dropPrefix.replacement":"$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
任何帮助将不胜感激!
【问题讨论】:
【参考方案1】:自动创造和自动进化 提示
确保 JDBC 用户具有适当的 DDL 权限。
如果启用了 auto.create,如果发现目标表丢失,连接器可以创建目标表。由于连接器使用记录模式作为表定义的基础,因此创建在线进行,记录从主题中使用。主键是根据键配置设置指定的。
如果启用了 auto.evolve,则连接器可以在遇到发现缺少列的记录时通过在目标表上发出 ALTER 来执行有限的自动进化。由于数据类型的更改和列的删除可能很危险,因此连接器不会尝试对表执行此类演变。也没有尝试添加主键约束。相反,如果 auto.evolve 被禁用,则不会执行任何进化,并且连接器任务会失败并出现错误,指出缺少列。
对于自动创建和自动演化,列的可空性基于架构中相应字段的可选性,并且如果适用,还根据相应字段的默认值指定默认值。我们使用以下从 Connect 模式类型到数据库特定类型的映射:
架构类型 MySQL Oracle PostgreSQL SQLite SQL Server Vertica
此处未提及的数据库不支持自动创建或自动进化。
重要
对于向后兼容的表架构演变,记录架构中的新字段必须是可选的或具有默认值。如果您需要删除字段,则应手动更改表架构以删除相应的列、为其分配默认值或使其可为空。
【讨论】:
“由于数据类型更改和列删除可能很危险,因此连接器不会尝试在表上执行此类演变” 谢谢...你是说不可能吗? 根据 jdbc sink 连接器文档,不支持更改 drop 列,虽然我发现以下文档似乎试图解决这个问题,但请注意由您来处理ddl的变化。 debezium.io/documentation/reference/connectors/… ... 架构更改主题以上是关于不使用 Kafka Connect 复制架构更改的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题
如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?