debezium - 模式注册表问题
Posted
技术标签:
【中文标题】debezium - 模式注册表问题【英文标题】:debezium - schema registry issue 【发布时间】:2021-10-07 19:39:58 【问题描述】:我正在为 debezium 使用 AWS 架构注册表。
在 debezium 中,我提到服务器名称为 mysql-db01
。因此,debezium 将使用此服务器名称创建一个主题,以添加有关服务器和架构更改的一些元数据。
当我部署连接器时,在架构注册表中我得到了这样的架构。
"type": "record",
"name": "SchemaChangeKey",
"namespace": "io.debezium.connector.mysql",
"fields": [
"name": "databaseName",
"type": "string"
],
"connect.name": "io.debezium.connector.mysql.SchemaChangeKey"
然后立即像这样创建了另一个版本。
"type": "record",
"name": "SchemaChangeValue",
"namespace": "io.debezium.connector.mysql",
"fields": [
"name": "source",
"type":
"type": "record",
"name": "Source",
"fields": [
"name": "version",
"type": "string"
,
"name": "connector",
"type": "string"
,
"name": "name",
"type": "string"
,
"name": "ts_ms",
"type": "long"
,
"name": "snapshot",
"type": [
"type": "string",
"connect.version": 1,
"connect.parameters":
"allowed": "true,last,false"
,
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
,
"null"
],
"default": "false"
,
"name": "db",
"type": "string"
,
"name": "sequence",
"type": [
"null",
"string"
],
"default": null
,
"name": "table",
"type": [
"null",
"string"
],
"default": null
,
"name": "server_id",
"type": "long"
,
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
,
"name": "file",
"type": "string"
,
"name": "pos",
"type": "long"
,
"name": "row",
"type": "int"
,
"name": "thread",
"type": [
"null",
"long"
],
"default": null
,
"name": "query",
"type": [
"null",
"string"
],
"default": null
],
"connect.name": "io.debezium.connector.mysql.Source"
,
"name": "databaseName",
"type": [
"null",
"string"
],
"default": null
,
"name": "schemaName",
"type": [
"null",
"string"
],
"default": null
,
"name": "ddl",
"type": [
"null",
"string"
],
"default": null
,
"name": "tableChanges",
"type":
"type": "array",
"items":
"type": "record",
"name": "Change",
"namespace": "io.debezium.connector.schema",
"fields": [
"name": "type",
"type": "string"
,
"name": "id",
"type": "string"
,
"name": "table",
"type":
"type": "record",
"name": "Table",
"fields": [
"name": "defaultCharsetName",
"type": [
"null",
"string"
],
"default": null
,
"name": "primaryKeyColumnNames",
"type": [
"null",
"type": "array",
"items": "string"
],
"default": null
,
"name": "columns",
"type":
"type": "array",
"items":
"type": "record",
"name": "Column",
"fields": [
"name": "name",
"type": "string"
,
"name": "jdbcType",
"type": "int"
,
"name": "nativeType",
"type": [
"null",
"int"
],
"default": null
,
"name": "typeName",
"type": "string"
,
"name": "typeExpression",
"type": [
"null",
"string"
],
"default": null
,
"name": "charsetName",
"type": [
"null",
"string"
],
"default": null
,
"name": "length",
"type": [
"null",
"int"
],
"default": null
,
"name": "scale",
"type": [
"null",
"int"
],
"default": null
,
"name": "position",
"type": "int"
,
"name": "optional",
"type": [
"null",
"boolean"
],
"default": null
,
"name": "autoIncremented",
"type": [
"null",
"boolean"
],
"default": null
,
"name": "generated",
"type": [
"null",
"boolean"
],
"default": null
],
"connect.name": "io.debezium.connector.schema.Column"
],
"connect.name": "io.debezium.connector.schema.Table"
],
"connect.name": "io.debezium.connector.schema.Change"
],
"connect.name": "io.debezium.connector.mysql.SchemaChangeValue"
这 2 个架构不匹配,因此 AWS 架构注册表不允许连接器注册第二个版本。但第二个版本是连接器的实际架构。
为了解决这个问题,我删除了架构(在架构注册表中)。然后删除连接器,重新部署连接器,然后它工作了。
但我试图理解为什么架构第一次有不同的版本。
【问题讨论】:
一个模式是一个键。另一个是价值。这些不应该是同一个注册表“主题”的一部分......您可以通过不使用 Kafka 的 AvroConverterkey.converter
来解决这个问题
嗯,明白了。那么如果我使用 Json 转换器,它会解决这个问题吗?
假设 AWS Schema Registry 不像 Confluent 那样存储 JSON 模式,那么确定
支持avro、Json、protouf
就像我说的,不要使用与注册表集成的key.converter
,例如内置的Kafka StringConverter
【参考方案1】:
我在源连接器和接收器连接器上使用了以下键/值转换器来使其工作。
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"key.converter.registry.name": "bhuvi-debezium",
"value.converter.registry.name": "bhuvi-debezium",
【讨论】:
以上是关于debezium - 模式注册表问题的主要内容,如果未能解决你的问题,请参考以下文章
Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝
无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器