为啥我会收到很多带有 debezium 的重复项?

Posted

技术标签:

【中文标题】为啥我会收到很多带有 debezium 的重复项?【英文标题】:Why I receive a lot of duplicates with debezium?为什么我会收到很多带有 debezium 的重复项? 【发布时间】:2020-11-11 17:53:39 【问题描述】:

我正在使用 docker-compose 在本地部署中测试 Debezium 平台。这是我的测试用例:

    运行 postgres、kafka、zookeeper 和 debezium/connect:1.3 的 3 个副本 使用以下配置在其中一个副本中配置连接器:

  "name": "database-connector",  
  "config": 
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "wal2json",
    "slot.name": "database",
    "database.hostname": "debezium_postgis_1", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "database", 
    "database.server.name": "database",
    "heartbeat.interval.ms": 5000,
    "table.whitelist": "public.outbox",
    "transforms.outbox.table.field.event.id": "event_uuid",
    "transforms.outbox.table.field.event.key": "event_name",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.payload.id": "event_uuid",
    "transforms.outbox.route.topic.replacement": "$routedByValue",
    "transforms.outbox.route.by.field": "topic",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "max.batch.size": 1,
    "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
    "binary.handling.mode": "bytes"
  

    通过从另一个类调用此方法来运行一个脚本,该脚本在发件箱表中执行 2000 次插入
    @Transactional
    public void write(String eventName, String topic, byte[] payload) 
        Outbox newRecord = new Outbox(eventName, topic, payload);
        repository.save(newRecord);
        repository.delete(newRecord);
    
    几秒钟后(当我在 Kafka 上看到第一条消息时),我杀死了正在处理流的副本。假设它成功发送了 200 条关于正确主题的消息。 我从 debezium 存储偏移量的主题得到最后一条偏移量消息:

   "transaction_id": null,
   "lsn_proc": 24360992,
   "lsn": 24495808,
   "txId": 560,
   "ts_usec": 1595337502556806

    然后我打开一个 db shell 并运行以下命令 SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots; 和 postgres 回复:
[
  
    "slot_name": "database",
    "restart_lsn": 24360856,
    "confirmed_flush_lsn": 24360992
  
]
    我杀死副本 5 分钟后,Kafka 重新平衡连接器,并在其中一个活动副本上部署新的运行任务。 新的连接器开始处理流,但它似乎从头开始,因为完成后我在 Kafka 上发现了 2200 条消息。 使用该配置(max.batch.size: 1AlwaysCommitPolicy)我希望看到最多 2001 条消息。 我哪里错了?

【问题讨论】:

【参考方案1】:

我在我的配置中发现了问题: "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy" 仅适用于嵌入式 API。

此外,debezium/connect:1.3 docker 映像的 OFFSET_FLUSH_INTERVAL_MS 的默认值为 1 分钟。因此,如果我在容器的前 1 分钟内停止容器,则不会在 kafka 上存储任何偏移量

【讨论】:

以上是关于为啥我会收到很多带有 debezium 的重复项?的主要内容,如果未能解决你的问题,请参考以下文章

为啥我会收到带有 std::bind() 的“本地临时返回地址”警告?

为啥我会收到 50% 的 GCP Pub/Sub 消息重复?

为啥我会收到“权限被拒绝”?

为啥我会得到额外的(1 行受影响)[重复]

带有 kafka 的 Debezium 还是只有嵌入式 Debezium?

为啥我会收到 ConcurrentModificationException?