Kafka Producer 无法验证没有 PK 的记录并返回 InvalidRecordException

Posted

技术标签:

【中文标题】Kafka Producer 无法验证没有 PK 的记录并返回 InvalidRecordException【英文标题】:Kafka Producer cannot validate record wihout PK and return InvalidRecordException 【发布时间】:2020-07-26 16:58:19 【问题描述】:

我的 kafka 制作人有错误。我使用 Debezium Kafka 连接器 V1.1.0 Final 和 Kafka 2.4.1 。对于有 pk 的表,所有表都被清空了,但不幸的是,对于没有 pk 的表,它给了我这个错误:

[2020-04-14 10:00:00,096] INFO   Exporting data from table 'public.table_0' (io.debezium.relational.RelationalSnapshotChangeEventSource:280)
[2020-04-14 10:00:00,097] INFO   For table 'public.table_0' using select statement: 'SELECT * FROM "public"."table_0"' (io.debezium.relational.RelationalSnapshotChangeEventSource:287)
[2020-04-14 10:00:00,519] INFO   Finished exporting 296 records for table 'public.table_0'; total duration '00:00:00.421' (io.debezium.relational.RelationalSnapshotChangeEventSource:330)
[2020-04-14 10:00:00,522] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource:79)
[2020-04-14 10:00:00,523] INFO Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfo=source_info[server='postgres'db='xxx, lsn=38/C74913C0, txId=4511542, timestamp=2020-04-14T02:00:00.517Z, snapshot=FALSE, schema=public, table=table_0], partition=server=postgres, lastSnapshotRecord=true]] (io.debezium.pipeline.ChangeEventSourceCoordinator:90)
[2020-04-14 10:00:00,524] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics:59)
[2020-04-14 10:00:00,526] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:100)
[2020-04-14 10:00:00,550] ERROR WorkerSourceTaskid=pg_dev_pinjammodal-0 failed to send record to table_0: (org.apache.kafka.connect.runtime.WorkerSourceTask:347)
org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence be rejected.

我检查了表格,它似乎是有效的记录。我在我的配置中设置了我的制作人producer.ack=1。这个配置是不是在这里触发了失效?

【问题讨论】:

【参考方案1】:

问题是为需要密钥的非 PK 表创建带有日志压缩的 Kafka 主题。消息没有键,因为表没有 PK。这会导致代理无法验证 Kafka 消息。

解决方案是不对主题设置日志压缩和/或不预先创建这些主题。另一种选择是将 PK 添加到表中。

【讨论】:

我在创建主题时收到.InvalidRecordException: One or more records have been rejected 错误,因为我使用的是--config cleanup.policy=compact 选项。删除此选项解决了我的问题 如果我们没有安装 kSql,你如何设法将 PK 添加到这些表中?我在重新启动架构注册表时遇到了同样的问题。 我不使用 KSQL 或 confluent,从头开始依赖纯 kafka。因此,在您的情况下@Javier,唯一的方法是关闭表的接收器,截断它并重组表 PK。或者在其他情况下,如果故意不使用 PK,我建议不要对该表使用压缩。 感谢您的回答,我使用融合图像但社区版,儿子没有任何类型的可视化 IDE,也没有 kSql,但我使用 kafka 提供的脚本创建主题和模式,所以我不知道您的意思是什么表,我仍然不知道我们需要压缩,但现在尝试保留记录似乎是公平的。我怎样才能到达你的意思这些桌子?它们是主题表还是模式? 按照 Kafka 教程,我偶然发现了同样的问题。您可能想在回答中提及具体参数--config cleanup.policy=compact

以上是关于Kafka Producer 无法验证没有 PK 的记录并返回 InvalidRecordException的主要内容,如果未能解决你的问题,请参考以下文章

apache kafka系列之Producer处理逻辑

Kafka SSL 握手失败问题

Kafka消息送达语义详解

Kafka生产者producer简要总结

Kafka Producer Retries & Idempotence 原理

来自Kafka Producer的过多控制台消息