kafka-connect 到同步数据库的硬删除事件不起作用或出现错误
Posted
技术标签:
【中文标题】kafka-connect 到同步数据库的硬删除事件不起作用或出现错误【英文标题】:hard delete events on kafka-connect to sync databases doesn't work or gives a error 【发布时间】:2021-08-06 07:08:13 【问题描述】:我已连接我的 postgres 数据库以在 mysql 数据库上同步。
创建和更新事件在接收器上工作正常,但是当我删除源上的一行(不仅仅是列中的数据)时,它会出错。
我尝试了一些东西,但没有运气。
1 - 当我没有将“createKey”和“extractInt”放入 MySql Sink 的“transform”中时,我收到一个错误,并且该列不使用 bigserial 创建。
“密钥规范中使用的 BLOB/TEXT 列 'id_consultor' 没有密钥长度”。
2 - 但是,如果我将我的配置放入“createKey”和“extractInt”,则创建和删除工作正常,但在删除事件时会出现此错误:
“仅在缺少架构的情况下支持 [将字段从值复制到键] 的 Map 对象,发现:null”。
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id_consultor",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id_consultor"
3 - 如果我输入我的源代码(Postgres)
**"transforms.unwrap.delete.handling.mode":"rewrite"**
删除部分执行“软删除”不会删除行,只是删除所有数据并保留填充为 0 的非空字段。
有人可以帮助我吗?谢谢!
Postgres 连接器:
"name": "postgres-connector",
"config":
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "**",
"database.port": "5432",
"database.user": "**",
"database.password": "**",
"database.dbname" : "**",
"database.server.name": "kafkaPostgres",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "history",
"schema.include.list": "public",
"table.include.list": "public.consultor",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"plugin.name": "pgoutput",
"transforms": "unwrap, dropPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.add.fields": "table,lsn",
"transforms.unwrap.add.headers": "db",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"kafkaPostgres.public.(.*)",
"transforms.dropPrefix.replacement":"$1"
MySql 接收器:
"name": "mysql-sink",
"config":
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "consultor",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"connection.url": "**,
"connection.user":"**",
"connection.password":"**",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"dialect.name": "MySqlDatabaseDialect",
"Database Dialect": "MySqlDatabaseDialect",
"table.name.format": "consultor",
"pk.mode": "record_key",
"pk.fields": "id_consultor",
"delete.enabled": "true",
"drop.invalid.message": "true",
"delete.retention.ms": 1,
"fields.whitelist": "id_consultor, idempresaorganizacional, cd_consultor_cpf, dt_consultor_nascimento , ds_justificativa, nn_consultor , cd_consultor_rg, id_motivo, id_situacao , id_sub_motivo",
"transforms": "unwrap, flatten, route, createKey, extractInt ",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": ".",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(?:[^.]+)\\.(?:[^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id_consultor",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id_consultor"
【问题讨论】:
【参考方案1】:我在连接器上添加了这个属性:
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
并在接收器上替换它:
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
到:
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
一切运行良好,但我无法阅读主题中的消息,因为我使用的是 debezium kafka 版本并且没有 avro 控制台阅读器。
现在我正在尝试这个版本的一些插件来读取 avro 文件。
希望能帮到你。
【讨论】:
以上是关于kafka-connect 到同步数据库的硬删除事件不起作用或出现错误的主要内容,如果未能解决你的问题,请参考以下文章
使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由
Kafka-connect,Bootstrap 代理断开连接