如何将新表添加到 Debezium MySQL 连接器?
Posted
技术标签:
【中文标题】如何将新表添加到 Debezium MySQL 连接器?【英文标题】:How to add new table to Debezium MySQL connector? 【发布时间】:2021-05-19 03:08:21 【问题描述】:我有 Debezium mysql 连接器:
"name": "debezium_mysql",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "***",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "$file:/connect-credentials.properties:mysql_pass",
"database.server.name": "mysql",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "when_needed",
"database.include.list": "compare_sports",
"table.include.list": "compare_sports.matches,compare_sports.games",
"database.history.kafka.topic": "mysql_compare_sports_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "***:9092",
"include.schema.changes": "false",
"transforms": "extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id"
我想从同一个mysql中的其他数据库添加新表(存在很长时间)。将其添加到包含列表后,出现错误:
Encountered change event for table new_database.new_table whose schema isn't known to this connector
我尝试使用snapshot.mode: initial
创建新的连接器,但随后只创建了新的mysql_history
主题,但没有所需的new_database.new_table
主题
我应该怎么做才能将新数据库中的新表添加到现有连接器?
谢谢
【问题讨论】:
此配置项需要将表列为schemaName.tableName
【参考方案1】:
您可以创建一个新的连接器并将snapshot
设置为initial
并指定一个不同的database.server.id
由于表已经存在,您应该包含snapshot.override
以及默认实现将在现有表中选择所有行,这将持有锁并阻止写入者写入数据库.你绝对不想这样做。
...
"snapshot.mode"="initial",
"database.server.id": "different",
"snapshot.select.statement.overrides":"new-table",
"snapshot.select.statement.new-table":"select * .. where <>"
...
或者,如果您想使用相同的连接器名称,则必须停止它并手动清除其在内部 connect.offsets.storage
主题中的偏移量。然后,当重新创建连接器(使用相同的名称)时,它将根据您提供的快照配置恢复其偏移量。
更多这样的案例可以看这个blog(光盘:我是作者)
【讨论】:
【参考方案2】:从 debezium:1.6 开始,您应该能够发送信号以对新添加的表进行快照。 https://debezium.io/documentation/reference/1.7/configuration/signalling.html https://debezium.io/blog/2021/05/06/debezium-1-6-alpha1-released/
通过在特定表中插入记录来发送信号:
INSERT INTO myschema.debezium_signal VALUES('ad-hoc-1', 'execute-snapshot', '"data-collections": ["schema1.table1", "schema1.table2"]')
要使此功能可用,您应该首先创建此表
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
* 不同数据库类型的语法可能不同
还必须在捕获的数据集合中添加信号表,即包含在table.include.list
参数中。
【讨论】:
以上是关于如何将新表添加到 Debezium MySQL 连接器?的主要内容,如果未能解决你的问题,请参考以下文章
在 Debezium Mysql 连接器中将更多表列入白名单的有效方法
在 EF 6 和 MVC 5 中使用 Code First 方法将新表添加到现有数据库
将新表列添加到 Microsoft SQL Server 中的特定序号位置