debezium - 模式注册表问题

Posted

技术标签:

【中文标题】debezium - 模式注册表问题【英文标题】:debezium - schema registry issue 【发布时间】:2021-10-07 19:39:58 【问题描述】:

我正在为 debezium 使用 AWS 架构注册表。

在 debezium 中,我提到服务器名称为 mysql-db01。因此,debezium 将使用此服务器名称创建一个主题,以添加有关服务器和架构更改的一些元数据。

当我部署连接器时,在架构注册表中我得到了这样的架构。


  "type": "record",
  "name": "SchemaChangeKey",
  "namespace": "io.debezium.connector.mysql",
  "fields": [
    
      "name": "databaseName",
      "type": "string"
    
  ],
  "connect.name": "io.debezium.connector.mysql.SchemaChangeKey"

然后立即像这样创建了另一个版本。


  "type": "record",
  "name": "SchemaChangeValue",
  "namespace": "io.debezium.connector.mysql",
  "fields": [
    
      "name": "source",
      "type": 
        "type": "record",
        "name": "Source",
        "fields": [
          
            "name": "version",
            "type": "string"
          ,
          
            "name": "connector",
            "type": "string"
          ,
          
            "name": "name",
            "type": "string"
          ,
          
            "name": "ts_ms",
            "type": "long"
          ,
          
            "name": "snapshot",
            "type": [
              
                "type": "string",
                "connect.version": 1,
                "connect.parameters": 
                  "allowed": "true,last,false"
                ,
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum"
              ,
              "null"
            ],
            "default": "false"
          ,
          
            "name": "db",
            "type": "string"
          ,
          
            "name": "sequence",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "table",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "server_id",
            "type": "long"
          ,
          
            "name": "gtid",
            "type": [
              "null",
              "string"
            ],
            "default": null
          ,
          
            "name": "file",
            "type": "string"
          ,
          
            "name": "pos",
            "type": "long"
          ,
          
            "name": "row",
            "type": "int"
          ,
          
            "name": "thread",
            "type": [
              "null",
              "long"
            ],
            "default": null
          ,
          
            "name": "query",
            "type": [
              "null",
              "string"
            ],
            "default": null
          
        ],
        "connect.name": "io.debezium.connector.mysql.Source"
      
    ,
    
      "name": "databaseName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    ,
    
      "name": "schemaName",
      "type": [
        "null",
        "string"
      ],
      "default": null
    ,
    
      "name": "ddl",
      "type": [
        "null",
        "string"
      ],
      "default": null
    ,
    
      "name": "tableChanges",
      "type": 
        "type": "array",
        "items": 
          "type": "record",
          "name": "Change",
          "namespace": "io.debezium.connector.schema",
          "fields": [
            
              "name": "type",
              "type": "string"
            ,
            
              "name": "id",
              "type": "string"
            ,
            
              "name": "table",
              "type": 
                "type": "record",
                "name": "Table",
                "fields": [
                  
                    "name": "defaultCharsetName",
                    "type": [
                      "null",
                      "string"
                    ],
                    "default": null
                  ,
                  
                    "name": "primaryKeyColumnNames",
                    "type": [
                      "null",
                      
                        "type": "array",
                        "items": "string"
                      
                    ],
                    "default": null
                  ,
                  
                    "name": "columns",
                    "type": 
                      "type": "array",
                      "items": 
                        "type": "record",
                        "name": "Column",
                        "fields": [
                          
                            "name": "name",
                            "type": "string"
                          ,
                          
                            "name": "jdbcType",
                            "type": "int"
                          ,
                          
                            "name": "nativeType",
                            "type": [
                              "null",
                              "int"
                            ],
                            "default": null
                          ,
                          
                            "name": "typeName",
                            "type": "string"
                          ,
                          
                            "name": "typeExpression",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          ,
                          
                            "name": "charsetName",
                            "type": [
                              "null",
                              "string"
                            ],
                            "default": null
                          ,
                          
                            "name": "length",
                            "type": [
                              "null",
                              "int"
                            ],
                            "default": null
                          ,
                          
                            "name": "scale",
                            "type": [
                              "null",
                              "int"
                            ],
                            "default": null
                          ,
                          
                            "name": "position",
                            "type": "int"
                          ,
                          
                            "name": "optional",
                            "type": [
                              "null",
                              "boolean"
                            ],
                            "default": null
                          ,
                          
                            "name": "autoIncremented",
                            "type": [
                              "null",
                              "boolean"
                            ],
                            "default": null
                          ,
                          
                            "name": "generated",
                            "type": [
                              "null",
                              "boolean"
                            ],
                            "default": null
                          
                        ],
                        "connect.name": "io.debezium.connector.schema.Column"
                      
                    
                  
                ],
                "connect.name": "io.debezium.connector.schema.Table"
              
            
          ],
          "connect.name": "io.debezium.connector.schema.Change"
        
      
    
  ],
  "connect.name": "io.debezium.connector.mysql.SchemaChangeValue"

这 2 个架构不匹配,因此 AWS 架构注册表不允许连接器注册第二个版本。但第二个版本是连接器的实际架构。

为了解决这个问题,我删除了架构(在架构注册表中)。然后删除连接器,重新部署连接器,然后它工作了。

但我试图理解为什么架构第一次有不同的版本。

【问题讨论】:

一个模式是一个键。另一个是价值。这些不应该是同一个注册表“主题”的一部分......您可以通过不使用 Kafka 的 AvroConverter key.converter 来解决这个问题 嗯,明白了。那么如果我使用 Json 转换器,它会解决这个问题吗? 假设 AWS Schema Registry 不像 Confluent 那样存储 JSON 模式,那么确定 支持avro、Json、protouf 就像我说的,不要使用与注册表集成的key.converter,例如内置的Kafka StringConverter 【参考方案1】:

我在源连接器和接收器连接器上使用了以下键/值转换器来使其工作。

        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "false",
        "internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "internal.key.converter.schemas.enable": "false",
        "internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "internal.value.converter.schemas.enable": "false",
        "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.region": "ap-south-1",
        "key.converter.schemaAutoRegistrationEnabled": "true",
        "value.converter.schemaAutoRegistrationEnabled": "true",
        "key.converter.avroRecordType": "GENERIC_RECORD",
        "value.converter.avroRecordType": "GENERIC_RECORD",
        "key.converter.registry.name": "bhuvi-debezium",
        "value.converter.registry.name": "bhuvi-debezium",

【讨论】:

以上是关于debezium - 模式注册表问题的主要内容,如果未能解决你的问题,请参考以下文章

Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

如何删除 debezium 连接器

Debezium kafka 连接连接器未成功更新

成功创建 Always On SQL Server 快照后,Debezium 未跟踪 CDC

Debezium 可能会产生无效的模式