如何将新表添加到 Debezium MySQL 连接器?

Posted

技术标签:

【中文标题】如何将新表添加到 Debezium MySQL 连接器?【英文标题】:How to add new table to Debezium MySQL connector? 【发布时间】:2021-05-19 03:08:21 【问题描述】:

我有 Debezium mysql 连接器:


    "name": "debezium_mysql",
    "config": 
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "***",
        "database.port": "3306",
        "database.user": "kafkaconnect",
        "database.password": "$file:/connect-credentials.properties:mysql_pass",
        "database.server.name": "mysql",
        "heartbeat.interval​.ms": 5000,
        "snapshot.mode": "when_needed",
        "database.include.list": "compare_sports",
        "table.include.list": "compare_sports.matches,compare_sports.games",
        "database.history.kafka.topic": "mysql_compare_sports_history",
        "database.history​.kafka.recovery​.poll.interval.ms": 5000,
        "database.history.kafka.bootstrap.servers": "***:9092",
        "include.schema.changes": "false",
        "transforms": "extractInt",
        "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field": "id"
    

我想从同一个mysql中的其他数据库添加新表(存在很长时间)。将其添加到包含列表后,出现错误:

Encountered change event for table new_database.new_table whose schema isn't known to this connector

我尝试使用snapshot.mode: initial 创建新的连接器,但随后只创建了新的mysql_history 主题,但没有所需的new_database.new_table 主题

我应该怎么做才能将新数据库中的新表添加到现有连接器?

谢谢

【问题讨论】:

此配置项需要将表列为schemaName.tableName 【参考方案1】:

您可以创建一个新的连接器并将snapshot 设置为initial 并指定一个不同的database.server.id

由于表已经存在,您应该包含snapshot.override 以及默认实现将在现有表中选择所有行,这将持有锁并阻止写入者写入数据库.你绝对不想这样做。


    ...
    "snapshot.mode"="initial",
    "database.server.id": "different",
    "snapshot.select.statement.overrides":"new-table",
    "snapshot.select.statement.new-table":"select * .. where <>"
    ...

或者,如果您想使用相同的连接器名称,则必须停止它并手动清除其在内部 connect.offsets.storage 主题中的偏移量。然后,当重新创建连接器(使用相同的名称)时,它将根据您提供的快照配置恢复其偏移量。

更多这样的案例可以看这个blog(光盘:我是作者)

【讨论】:

【参考方案2】:

从 debezium:1.6 开始,您应该能够发送信号以对新添加的表进行快照。 https://debezium.io/documentation/reference/1.7/configuration/signalling.html https://debezium.io/blog/2021/05/06/debezium-1-6-alpha1-released/

通过在特定表中插入记录来发送信号:

INSERT INTO myschema.debezium_signal VALUES('ad-hoc-1', 'execute-snapshot', '"data-collections": ["schema1.table1", "schema1.table2"]')

要使此功能可用,您应该首先创建此表

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

* 不同数据库类型的语法可能不同

还必须在捕获的数据集合中添加信号表,即包含在table.include.list 参数中。

【讨论】:

以上是关于如何将新表添加到 Debezium MySQL 连接器?的主要内容,如果未能解决你的问题,请参考以下文章

在 Debezium Mysql 连接器中将更多表列入白名单的有效方法

Debezium 创建新主题时如何编辑复制因子

在 EF 6 和 MVC 5 中使用 Code First 方法将新表添加到现有数据库

将新表列添加到 Microsoft SQL Server 中的特定序号位置

在运行时将新表重新加载/替换到 WPF DataGrid 中

如何仅使用 javascript 创建/添加新表行