为啥我会收到很多带有 debezium 的重复项?
Posted
技术标签:
【中文标题】为啥我会收到很多带有 debezium 的重复项?【英文标题】:Why I receive a lot of duplicates with debezium?为什么我会收到很多带有 debezium 的重复项? 【发布时间】:2020-11-11 17:53:39 【问题描述】:我正在使用 docker-compose 在本地部署中测试 Debezium 平台。这是我的测试用例:
-
运行 postgres、kafka、zookeeper 和 debezium/connect:1.3 的 3 个副本
使用以下配置在其中一个副本中配置连接器:
"name": "database-connector",
"config":
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "wal2json",
"slot.name": "database",
"database.hostname": "debezium_postgis_1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "database",
"database.server.name": "database",
"heartbeat.interval.ms": 5000,
"table.whitelist": "public.outbox",
"transforms.outbox.table.field.event.id": "event_uuid",
"transforms.outbox.table.field.event.key": "event_name",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.payload.id": "event_uuid",
"transforms.outbox.route.topic.replacement": "$routedByValue",
"transforms.outbox.route.by.field": "topic",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"max.batch.size": 1,
"offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
"binary.handling.mode": "bytes"
-
通过从另一个类调用此方法来运行一个脚本,该脚本在发件箱表中执行 2000 次插入
@Transactional
public void write(String eventName, String topic, byte[] payload)
Outbox newRecord = new Outbox(eventName, topic, payload);
repository.save(newRecord);
repository.delete(newRecord);
-
几秒钟后(当我在 Kafka 上看到第一条消息时),我杀死了正在处理流的副本。假设它成功发送了 200 条关于正确主题的消息。
我从 debezium 存储偏移量的主题得到最后一条偏移量消息:
"transaction_id": null,
"lsn_proc": 24360992,
"lsn": 24495808,
"txId": 560,
"ts_usec": 1595337502556806
-
然后我打开一个 db shell 并运行以下命令
SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots;
和 postgres 回复:
[
"slot_name": "database",
"restart_lsn": 24360856,
"confirmed_flush_lsn": 24360992
]
-
我杀死副本 5 分钟后,Kafka 重新平衡连接器,并在其中一个活动副本上部署新的运行任务。
新的连接器开始处理流,但它似乎从头开始,因为完成后我在 Kafka 上发现了 2200 条消息。
使用该配置(
max.batch.size: 1
和 AlwaysCommitPolicy
)我希望看到最多 2001 条消息。
我哪里错了?
【问题讨论】:
【参考方案1】:我在我的配置中发现了问题:
"offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy"
仅适用于嵌入式 API。
此外,debezium/connect:1.3 docker 映像的 OFFSET_FLUSH_INTERVAL_MS 的默认值为 1 分钟。因此,如果我在容器的前 1 分钟内停止容器,则不会在 kafka 上存储任何偏移量
【讨论】:
以上是关于为啥我会收到很多带有 debezium 的重复项?的主要内容,如果未能解决你的问题,请参考以下文章
为啥我会收到带有 std::bind() 的“本地临时返回地址”警告?
为啥我会收到 50% 的 GCP Pub/Sub 消息重复?