Debezium:从 MySQL 中删除的数据重新出现在 Elasticsearch 中

Posted

技术标签:

【中文标题】Debezium:从 MySQL 中删除的数据重新出现在 Elasticsearch 中【英文标题】:Debezium : Data deleted from MySQL reappearing in Elasticsearch 【发布时间】:2021-06-12 05:18:02 【问题描述】:

我使用 Debezium for mysql -> Elasticsearch CDC。 现在,问题是当我从 MySQL 中删除数据时,它仍然会重新出现在 Elasticsearch 中,即使 MySQL 数据库中不再存在数据。 UPDATE 和 INSERT 工作正常,但 DELETE 不是。 另外,我做了以下事情:

    删除 MySQL 中的数据
    删除 Elasticsearch 索引和 ES Kafka Sink
    在 Kakfa 中为 ES 创建新的连接器

现在,奇怪的是,我所有已删除的数据也在这里重新出现!当我在步骤 (3) 之前检查 ES 数据时,数据不存在。但之后,观察到这种行为。 请帮我解决这个问题!

MySQL 配置:

 "config": 
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.allowPublicKeyRetrieval": "true",
    "database.user": "cdc-reader",
    "tasks.max": "1",
    "database.history.kafka.bootstrap.servers": "X.X.X.X:9092",
    "database.history.kafka.topic": "schema-changes.mysql",
    "database.server.name": "data_test",
    "schema.include.list": "data_test",
    "database.port": "3306",
    "tombstones.on.delete": "true",
    "delete.enabled": "true",
    "database.hostname": "X.X.X.X",
    "database.password": "xxxxx",
    "name": "slave_test",
    "database.history.skip.unparseable.ddl": "true",
    "table.include.list": "search_ai.*"
  ,

弹性搜索配置:

  "config": 
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type.name": "_doc",
    "behavior.on.null.values": "delete",
    "transforms.extractKey.field": "ID",
    "tasks.max": "1",
    "topics": "search_ai.search_ai.slave_data",
    "transforms.InsertKey.fields": "ID",
    "transforms": "unwrap,key,InsertKey,extractKey",
    "key.ignore": "false",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "ID",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "name": "esd_2",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "connection.url": "http://X.X.X.X:9200",
    "transforms.InsertKey.type": "org.apache.kafka.connect.transforms.ValueToKey"
  ,

【问题讨论】:

【参考方案1】:

Debezium 正在读取事务日志,而不是源表,因此插入和更新总是会先被读取,从而导致 Elasticsearch 中的插入和文档更新...

其次,您是否使用新名称或不同名称创建了接收器连接器?

如果是同一个,原来的消费者组偏移量不会改变,导致消费者组在你删除原来的连接器之前的偏移量处拾取 如果是新名称,并且取决于接收器连接器使用者的 auto.offset.reset 值,您可能会从一开始就使用 Debezium 主题,并导致数据重新插入 Elasticsearch,如前所述。您需要检查您的 Mysql 删除事件是否实际上作为墓碑值生成/使用以导致 Elasticsearch 中的删除

【讨论】:

我用新名称创建了第二个连接器。我已将 MySQL 的 "tombstones.on.delete" 设置为 "true",并在 ES sink 中设置 "behavior.on.null.values": "delete"。这够了吗?另外,在 Kafka 消息中,我得到 before : "ID":1,.... after:null op : 'd' 但是,delete 仍然不起作用 您确定ExtractNewRecordState 转换仅返回after 值吗?否则,仅该记录不会导致删除。您可以使用 FileStreamSink 来测试您是否获得了预期的连接器输出 它返回之前和之后的值,之后设置为 null 作为 "after":null 对。并且整个 Kafka 记录需要为 (ID,null) 才能删除 Elasticsearch 中的文档 ID,因此请继续调试转换配置。例如,也许您想将 drop.tombstones 设置为 false debezium.io/documentation/reference/configuration/… 感谢您的帮助。发现没有为我的所有 MySQL 表设置 PK。这导致了两者之间的空值。

以上是关于Debezium:从 MySQL 中删除的数据重新出现在 Elasticsearch 中的主要内容,如果未能解决你的问题,请参考以下文章

MySQL 数据库到 BigQuery 的传输

在意外删除 AWS RDS 二进制日志后恢复 Debezium MySQL 连接器

多个表之间 CDC 事件的 Debezium 排序

在 Debezium 中无法根据 MySQL 的表创建某些主题

有没有办法删除debezium mysql连接器的现有任务并用新任务替换它

Debezium - 自定义负载 - MySQL 连接器