MongoDB Sink 连接器:Apache Kafka 中的消息被截断
Posted
技术标签:
【中文标题】MongoDB Sink 连接器:Apache Kafka 中的消息被截断【英文标题】:MongoDB Sink Connector : Message truncated in Apache Kafka 【发布时间】:2021-12-22 08:12:32 【问题描述】:我遇到了 MongoDB Kafka 连接器的问题。 我正在尝试将来自控制台生产者(和控制台消费者)的 json 消息生成到 Kafka 主题中。
当消息小于 4096 字节时,它会被正确消费。但是当消息大于 4096 字节时,我得到了这个异常:
ERROR WorkerSinkTaskid=scraper-mongo-sink-0 Error converting message value in topic 'rawdata' partition 0 at offset 154 and timestamp 1636471830852: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
.......
.......
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])" ........."[truncated 3595 bytes]; line: 1, column: 4096]
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
at [Source: (byte[])" "....[truncated 3595 bytes]; line: 1, column: 4096]
有没有人知道是什么导致了这个错误?更重要的是,如何解决这个问题?
注意。我尝试修改代理的一些默认属性,以及生产者/消费者,例如:offset.metadata.max.bytes、max.request.size、message.max.bytes、fetch.max.bytes 等。
请大家帮忙
【问题讨论】:
IIRC,控制台生产者可能正在截断数据。因此,连接器侧没有什么需要修复的 感谢您的回答。所以如果我理解正确,我想做的事情不会通过控制台生产者工作?应该改为开发生产者,还是有解决方案来完成我试图通过控制台生产者完成的工作? 一个建议是使用 shell 重定向kafka-console-producer ... < data.json
(确保每行有一个 JSON 对象)。或者你可以使用kcat
CLI 工具来代替
【参考方案1】:
您可以尝试改用kcat
工具,但我隐约记得以前遇到过这个问题,我可能看过源代码,但无论如何,替代方法是使用来自 shell 的输入重定向而不是键入 (或粘贴)那么多数据。 (如果是粘贴,那么问题要么是剪贴板,要么是终端,而不是 Kafka)
kafka-console-producer ... < data.json
确保每行有一个 JSON 对象/数组/值
【讨论】:
以上是关于MongoDB Sink 连接器:Apache Kafka 中的消息被截断的主要内容,如果未能解决你的问题,请参考以下文章
在 docker 上添加 MongoDB Sink 连接器?
kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表