Kafka Connect 如何使命名空间与数据库名称无关
Posted
技术标签:
【中文标题】Kafka Connect 如何使命名空间与数据库名称无关【英文标题】:Kafka Connect How to make namespace agnostic to database name 【发布时间】:2020-09-10 12:41:37 【问题描述】:我的环境
MySQL(5.7):我们有多个模式,命名约定是 application_name_env。
示例:假设我们有两个应用:app1 和 app2
开发环境:数据库名称将是 app1_dev、app2_dev
QA 环境: 数据库名称将是 app1_qa、app2_qa。
Debezium(0.8.3)。该插件用于 CDC mysql 日志。
连接器配置为:
"name": "connector-1",
"config":
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist":"database_name.account",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms" : "setSchema",
"transforms.setSchema.type" : "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.setSchema.schema.name" : "com.test.Account"
Spring Java 应用程序 我正在使用 Kafka Consumer(@KafkaListener) 来读取来自 Debezium 事件的更改。
我提供了 avsc 文件并使用 gradle avro 插件来生成类。
来自开发环境的架构
"type":"record",
"name":"Accounts",
"namespace":"com.test",
"fields":[
"name":"before",
"type":[
"null",
"type":"record",
"name":"Value",
"namespace":"dbserver1.app1_dev.account",
"fields":[
"name":"id",
"type":"long"
],
"connect.name":"dbserver1.app1_dev.account.Value"
],
"default":null
,
"name":"after",
"type":[
"null",
"dbserver1.app1_dev.account.Value"
],
"default":null
,
"name":"source",
"type":
"type":"record",
"name":"Source",
"namespace":"io.debezium.connector.mysql",
"fields":[
"name":"version",
"type":[
"null",
"string"
],
"default":null
,
"name":"name",
"type":"string"
,
"name":"server_id",
"type":"long"
,
"name":"ts_sec",
"type":"long"
,
"name":"gtid",
"type":[
"null",
"string"
],
"default":null
,
"name":"file",
"type":"string"
,
"name":"pos",
"type":"long"
,
"name":"row",
"type":"int"
,
"name":"snapshot",
"type":[
"type":"boolean",
"connect.default":false
,
"null"
],
"default":false
,
"name":"thread",
"type":[
"null",
"long"
],
"default":null
,
"name":"db",
"type":[
"null",
"string"
],
"default":null
,
"name":"table",
"type":[
"null",
"string"
],
"default":null
,
"name":"query",
"type":[
"null",
"string"
],
"default":null
],
"connect.name":"io.debezium.connector.mysql.Source"
,
"name":"op",
"type":"string"
,
"name":"ts_ms",
"type":[
"null",
"long"
],
"default":null
],
"connect.name":"com.test.Account"
问题: 由于我的数据库模式是动态的,即它们以 env 后缀结尾。
每个环境中生成的 Schema 都有不同的命名空间。
开发:dev.app1_dev.accounts 质量检查:dev.app1_qa.accounts
由于命名空间不同,我无法在 QA 中反序列化我的开发代码。所以如果使用在 Dev 中生成的 schema,代码将无法在 QA 中运行。
我想确保命名空间在所有环境中都是一致的。
【问题讨论】:
你能解决这个问题吗?如果您有一些指导,将不胜感激。谢谢! 【参考方案1】:请使用org.apache.kafka.connect.transforms.SetSchemaMetadata
SMT - 见https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
【讨论】:
我已经在使用它了 :) 问题是在嵌套级别,命名空间没有改变。以上是关于Kafka Connect 如何使命名空间与数据库名称无关的主要内容,如果未能解决你的问题,请参考以下文章
kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)