为数据库中的多个表配置 debezium 连接器

Posted

技术标签:

【中文标题】为数据库中的多个表配置 debezium 连接器【英文标题】:Configure a debezium connector for multiple tables in a database 【发布时间】:2021-05-20 19:56:41 【问题描述】:

我正在尝试为 mysql 数据库中的多个表配置 Debezium 连接器(我在 MySQL 8.0 上使用 debezium 1.4)。 我的公司在 kafka 中创建主题时有一个命名模式要遵循,而且这个模式不允许使用下划线 (_),所以我不得不用连字符 (-) 替换它们

所以,我的主题名称是:

主题 1

fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status". 
- All changes in that table, must go to that topic.

主题 2

fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.

主题 3

fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.

我正在尝试使用转换“ByLogicalTableRouter”,但我找不到解决我的情况的正则表达式解决方案。

 "name": "debezium.connector",
 "config":
     
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium", 
"database.password": "password", 
"database.server.id": "1000", 
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2" 
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2" 
    

在第一次转换中,我试图删除重复的架构 主题路由中的名称。 在第二个转换中,替换所有 仍然强调 _ 代表 hiphens -

但是,我收到以下错误,这表明它正在尝试将所有内容发送到同一主题

Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier

如何进行转换,将每个表的事件转发到各自的主题?

【问题讨论】:

【参考方案1】:
    删除架构名称

在第一次转换中,我试图在主题路由中删除重复的模式名称。

用你的正则表达式转换后你会有两个点,所以你需要修复它:

"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2" 
    用下划线替换 hiphens

您可以尝试使用Kafka Connect Common Transformations中的ChangeCase SMT。

【讨论】:

没有工作:(首先我尝试了这样的“转换”:“RerouteName,changeCase”,“transforms.RerouteName.type”:“io.debezium.transforms.ByLogicalTableRouter”,“transforms.RerouteName。 topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)", "transforms.RerouteName.topic.replacement": "$1.$2", "transforms.changeCase.类型”:“com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value”,“transforms.changeCase.from”:“LOWER_UNDERSCORE”,“transforms.changeCase.to”:“LOWER_HYPHEN”并得到错误:引起:org.apache.avro.SchemaParseException:非法字符:ts-ms "transforms":"RerouteName,RerouteUnder", "transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter", "transforms.RerouteName.topic.regex":"(.* ).card_acquisition.(.*)", "transforms.RerouteName.topic.replacement": "$1.$2", "transforms.RerouteUnder.type":"io.debezium.transforms.ByLogicalTableRouter", "transforms.RerouteUnder.topic .regex":"(.*)_(.*)", "transforms.RerouteUnder.topic.replacement": "$1-$2" 错误是由:org.apache.kafka.connect.errors.SchemaBuilderException:由于字段名称重复而无法创建字段__dbz__physicalTableIdentifier 第一个错误来自AvroConverter,你可以改用JSONConverter。 很遗憾,这个项目我不能使用JSON,我需要使用Avro。 请告诉我,在没有转换的情况下,一切都如你所愿吗(当然主题命名除外)?

以上是关于为数据库中的多个表配置 debezium 连接器的主要内容,如果未能解决你的问题,请参考以下文章

删除“table.include.list”不会强制 Debezium 为数据库中的其余表创建主题

将带有数据的新表包含到现有的 Debezium 连接器中

Debezium 心跳表未更新

KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接

Kafka 连接(Debezium mysql 连接器)只监听 table.include.list 中的特定表

在 Debezium Mysql 连接器中将更多表列入白名单的有效方法