使用表白名单选项更新 Debezium MySQL 连接器

Posted

技术标签:

【中文标题】使用表白名单选项更新 Debezium MySQL 连接器【英文标题】:Updating a Debezium MySQL connector with table whitelist option 【发布时间】:2019-04-29 20:18:25 【问题描述】:

我正在使用 Debezium (0.7.5) mysql 连接器,如果我想使用选项 table.whitelist 更新此配置,我正在尝试了解最佳方法。

假设我创建了一个连接器,如下所示:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '

  "name": "MyConnector",
  "config": 
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    
'

一段时间后(2 周),我需要在这个 table.whitelist 选项中添加一个新表 (myDb.table3)(而且这个表是旧表,它是在连接器之前创建的)

我尝试的是:

暂停连接器。 删除了历史主题(也许这是问题所在?)。 通过 API 更新配置端点更新了配置。 恢复连接器。

通过 API 更新命令:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
'

但它不起作用,也许这根本不是最好的方法。 在其他连接器中,我没有使用选项table.whitelist,所以当我需要收听新表时,我没有这个问题。

我的最后一个选择是删除此连接器并使用此新配置创建另一个连接器,同时监听新表 (myDb.table3)。问题是如果我想要来自myDb.table3 的初始数据,我必须使用快照initial 创建,但我不想从其他表myDb.table1,myDb.table2 的快照中生成所有消息。

【问题讨论】:

你可以发布一个全新的配置而不是更新现有的配置 @cricket_007 您的意思是删除现有连接器并创建一个新连接器?或者只是创建另一个名称不同的连接器? 我猜你也可以,但删除任何东西都会影响正在运行的连接器 【参考方案1】:

目前尚不支持更改白名单/黑名单配置。目前正在处理此问题(请参阅DBZ-175),我们希望在下一个版本中对此提供预览支持。这里有一个pending PR,不过需要做更多的工作。

在此实施之前,您最好的选择是设置一个新的连接器实例,捕获您感兴趣的其他表。这是以运行两个连接器为代价的(两者都将维护一个 binlog 阅读器会话),但只要您不需要过于频繁地更改过滤器配置,它就可以解决问题。

【讨论】:

谢谢,@Gunnar。我按照你的建议做了。现在,我计划从 Debezium 7.5 迁移到 9 以避免这个问题。 @japoneizo 你有新版本的问题解决方案吗?如果是,请发送给我您正在使用的 debezium 确切版本。 @japoneizo 您当前使用的是哪个版本?如果您使用的是 0.10,那么它运行良好(确保此配置 binlog_row_image=full)。如果是 0.9.2.Final,你是如何实现的? 最新版本的 DBZ 是否也会出现此问题?喜欢 1.1 版吗? 这仍然是个问题吗?【参考方案2】:

最新版本的Debezium Server,可以添加如下配置

debezium.snapshot.new.tables=parallel

如果你使用的是 Debezium,你可以试试这个配置值

snapshot.new.tables=parallel

注意:Debeziyum 服务器是支持 Kinesis、Google Pub sub 和 Apache Pulsar 的服务器。我正在使用它,它的配置有点不同。我必须在每个项目之前添加“debezium”

添加此配置后,任何添加到 tables.whitelist 的内容,Debezium 都会为这些额外的表创建快照。

我无法将您指向文档,但我在 GitHub 中浏览了他们的代码,并且我实际尝试了它对我有用。 这是 MySqlConnector 代码的链接

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

那里搜索 Field.create("snapshot.new.tables")

个人感觉 Debezium 东西很多,但是文档比较分散。

【讨论】:

嘿 Pavan,您能否发布一个指向有关 snapshot.new.tables 配置的文档的链接?我在文档网站上找不到它 嗨@MarkNS,我无法将您指向文档,但我在 github 中浏览了他们的代码,并且我实际尝试了它对我有用。这里是MySqlConnector代码的链接github.com/debezium/debezium/blob/master/…那里搜索Field.create("snapshot.new.tables") 个人感觉Debezium东西很多但是文档比较零散。【参考方案3】:

我有同样的问题,并用 debezium 的信号表解决。它以这种方式工作,您必须创建一个表以发送到数据表中的 debezium 命令。

CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

并在你的配置中设置做 debzium 一个标签"signal.data.collection": "public.debezium_signal"

之后,您可以在该表中使用 insert 发送命令:

INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','"data-collections": "myDb.table3"]');

在我的情况下,我必须在 table.include.list 和 column.include.list 中添加 de table 信号。

https://debezium.io/documentation/reference/stable/configuration/signalling.html

【讨论】:

以上是关于使用表白名单选项更新 Debezium MySQL 连接器的主要内容,如果未能解决你的问题,请参考以下文章

在 Debezium Mysql 连接器中将更多表列入白名单的有效方法

如何检查 debezium 快照是不是完整

如何使用 debezium mysql 连接器 kafka 进行初始快照加载?

干货 | Debezium实现Mysql到Elasticsearch高效实时同步

在意外删除 AWS RDS 二进制日志后恢复 Debezium MySQL 连接器

Debezium的基本使用(以MySQL为例)