kafka-connect 到同步数据库的硬删除事件不起作用或出现错误

Posted

技术标签:

【中文标题】kafka-connect 到同步数据库的硬删除事件不起作用或出现错误【英文标题】:hard delete events on kafka-connect to sync databases doesn't work or gives a error 【发布时间】:2021-08-06 07:08:13 【问题描述】:

我已连接我的 postgres 数据库以在 mysql 数据库上同步。

创建和更新事件在接收器上工作正常,但是当我删除源上的一行(不仅仅是列中的数据)时,它会出错。

我尝试了一些东西,但没有运气。

1 - 当我没有将“createKey”和“extractInt”放入 MySql Sink 的“transform”中时,我收到一个错误,并且该列不使用 bigserial 创建。

“密钥规范中使用的 BLOB/TEXT 列 'id_consultor' 没有密钥长度”。

2 - 但是,如果我将我的配置放入“createKey”和“extractInt”,则创建和删除工作正常,但在删除事件时会出现此错误:

“仅在缺少架构的情况下支持 [将字段从值复制到键] 的 Map 对象,发现:null”。

"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id_consultor",   
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id_consultor"

3 - 如果我输入我的源代码(Postgres)

**"transforms.unwrap.delete.handling.mode":"rewrite"**

删除部分执行“软删除”不会删除行,只是删除所有数据并保留填充为 0 的非空字段。

有人可以帮助我吗?谢谢!

Postgres 连接器:

  "name": "postgres-connector",
  "config": 
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "**",
    "database.port": "5432",
    "database.user": "**",
    "database.password": "**",
    "database.dbname" : "**",
    "database.server.name": "kafkaPostgres",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "history",
    "schema.include.list": "public",
    "table.include.list": "public.consultor",
    "time.precision.mode": "connect",
    "tombstones.on.delete": "true",
    "plugin.name": "pgoutput",
    "transforms": "unwrap, dropPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode":"rewrite",
    "transforms.unwrap.add.fields": "table,lsn",
    "transforms.unwrap.add.headers": "db",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex":"kafkaPostgres.public.(.*)",
    "transforms.dropPrefix.replacement":"$1"

MySql 接收器:

"name": "mysql-sink",
  "config": 
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "consultor",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "connection.url": "**,
    "connection.user":"**",
    "connection.password":"**",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "dialect.name": "MySqlDatabaseDialect",
    "Database Dialect": "MySqlDatabaseDialect",
    "table.name.format": "consultor",
    "pk.mode": "record_key",
    "pk.fields": "id_consultor",
    "delete.enabled": "true",
    "drop.invalid.message": "true",
    "delete.retention.ms": 1,
    "fields.whitelist": "id_consultor, idempresaorganizacional, cd_consultor_cpf, dt_consultor_nascimento , ds_justificativa, nn_consultor , cd_consultor_rg, id_motivo, id_situacao , id_sub_motivo",
    "transforms": "unwrap, flatten, route, createKey, extractInt ",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode":"rewrite",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": ".",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "(?:[^.]+)\\.(?:[^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$1",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"id_consultor",   
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field": "id_consultor"

【问题讨论】:

【参考方案1】:

我在连接器上添加了这个属性:

"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",

并在接收器上替换它:

 "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",

到:

"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",

一切运行良好,但我无法阅读主题中的消息,因为我使用的是 debezium kafka 版本并且没有 avro 控制台阅读器。

现在我正在尝试这个版本的一些插件来读取 avro 文件。

希望能帮到你。

【讨论】:

以上是关于kafka-connect 到同步数据库的硬删除事件不起作用或出现错误的主要内容,如果未能解决你的问题,请参考以下文章

使用debeziumkafka-connect将postgres数据实时同步到kafka中,表topic重新路由

Kafka-Connect实践

数据同步这点事

Kafka-connect,Bootstrap 代理断开连接

使用本地 kafka-connect 集群连接远程数据库的连接超时

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题