Kafka Connect 如何使命名空间与数据库名称无关

Posted

技术标签:

【中文标题】Kafka Connect 如何使命名空间与数据库名称无关【英文标题】:Kafka Connect How to make namespace agnostic to database name 【发布时间】:2020-09-10 12:41:37 【问题描述】:

我的环境

MySQL(5.7):我们有多个模式,命名约定是 application_name_env。

示例:假设我们有两个应用:app1 和 app2

开发环境:数据库名称将是 app1_dev、app2_dev

QA 环境: 数据库名称将是 app1_qa、app2_qa。

Debezium(0.8.3)。该插件用于 CDC mysql 日志。

连接器配置为:


"name": "connector-1",
"config": 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "decimal.handling.mode": "double",
    "snapshot.mode": "when_needed",
    "table.whitelist":"database_name.account",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms" : "setSchema",
    "transforms.setSchema.type" : "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.setSchema.schema.name" : "com.test.Account"

Spring Java 应用程序 我正在使用 Kafka Consumer(@KafkaListener) 来读取来自 Debezium 事件的更改。

我提供了 avsc 文件并使用 gradle avro 插件来生成类。

来自开发环境的架构


"type":"record",
"name":"Accounts",
"namespace":"com.test",
"fields":[
  
     "name":"before",
     "type":[
        "null",
        
           "type":"record",
           "name":"Value",
           "namespace":"dbserver1.app1_dev.account",
           "fields":[
              
                 "name":"id",
                 "type":"long"
              
           ],
           "connect.name":"dbserver1.app1_dev.account.Value"
        
     ],
     "default":null
  ,
  
     "name":"after",
     "type":[
        "null",
        "dbserver1.app1_dev.account.Value"
     ],
     "default":null
  ,
  
     "name":"source",
     "type":
        "type":"record",
        "name":"Source",
        "namespace":"io.debezium.connector.mysql",
        "fields":[
           
              "name":"version",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           ,
           
              "name":"name",
              "type":"string"
           ,
           
              "name":"server_id",
              "type":"long"
           ,
           
              "name":"ts_sec",
              "type":"long"
           ,
           
              "name":"gtid",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           ,
           
              "name":"file",
              "type":"string"
           ,
           
              "name":"pos",
              "type":"long"
           ,
           
              "name":"row",
              "type":"int"
           ,
           
              "name":"snapshot",
              "type":[
                 
                    "type":"boolean",
                    "connect.default":false
                 ,
                 "null"
              ],
              "default":false
           ,
           
              "name":"thread",
              "type":[
                 "null",
                 "long"
              ],
              "default":null
           ,
           
              "name":"db",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           ,
           
              "name":"table",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           ,
           
              "name":"query",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           
        ],
        "connect.name":"io.debezium.connector.mysql.Source"
     
   ,
   
     "name":"op",
     "type":"string"
   ,
   
     "name":"ts_ms",
     "type":[
        "null",
        "long"
     ],
     "default":null
   
 ],
 "connect.name":"com.test.Account"
 

问题: 由于我的数据库模式是动态的,即它们以 env 后缀结尾。

每个环境中生成的 Schema 都有不同的命名空间。

开发:dev.app1_dev.accounts 质量检查:dev.app1_qa.accounts

由于命名空间不同,我无法在 QA 中反序列化我的开发代码。所以如果使用在 Dev 中生成的 schema,代码将无法在 QA 中运行。

我想确保命名空间在所有环境中都是一致的。

【问题讨论】:

你能解决这个问题吗?如果您有一些指导,将不胜感激。谢谢! 【参考方案1】:

请使用org.apache.kafka.connect.transforms.SetSchemaMetadata SMT - 见https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java

【讨论】:

我已经在使用它了 :) 问题是在嵌套级别,命名空间没有改变。

以上是关于Kafka Connect 如何使命名空间与数据库名称无关的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect 堆空间不足

kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)

Kafka Connect 如何构建实时数据管道

启用 SSL 后 Kafka Connect 超出 Java 堆空间

Kafka Connect 与 Amazon MSK

在Kafka Connect中,如何连接多个kafka集群?