Kafka 消息包含控制字符(MongoDB 源连接器)
Posted
技术标签:
【中文标题】Kafka 消息包含控制字符(MongoDB 源连接器)【英文标题】:Kafka message includes control characters (MongoDB Source Connector) 【发布时间】:2021-08-04 19:04:43 【问题描述】:我有一个 Kafka Connect MongoDB 源连接器(均通过 Confluent 平台)工作,但它创建的消息在开始时包含一个控制字符,这使得该消息的下游解析(到 JSON)比我想象的更难是。
正在运行的源连接器:
"name": "mongo-source-connector",
"config":
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://myUsername:myPassword@my-mongodb-host-address:27017",
"database": "myDatabase",
"collection": "myCollection",
"change.stream.full.document": "updateLookup",
"errors.log.enable": true
此Source连接器在Kafka主题中创建的消息如下(注意前导控制字符):
�"_id": "_data": "82609E8726000000012B022C0100296E5A1004BE208B099BCF4106822DE274B0B9D39A46645F69640064609E87267125D17D12D180620004", "operationType": "insert", "clusterTime": "$timestamp": "t": 1621002022, "i": 1, "fullDocument": "_id": "$oid": "609e87267125d17d12d18062", "uuid": "23534a5c-ad82-431c-a821-6b4aed4f59a1", "endingNumber": 10, "ns": "db": "myDatabase", "coll": "myCollection", "documentKey": "_id": "$oid": "609e87267125d17d12d18062"
控制字符使下游解析为 JSON 变得困难,因为它使原本有效的 JSON 无效。我不知道它为什么存在或如何摆脱它。
我想,我可以先解析出像这个控制字符这样的垃圾,然后再像 JSON 一样对待它,但这似乎是我想避免的创可贴。
我现在处理消息的方式,我认为这是无关紧要的,因为我已经测试过它可以在没有控制字符的情况下使用有效的 JSON,如下所示:
data class MyChangesetMessageId (
@JsonProperty("_data")
val data: String
)
data class MyChangesetMessageTimestamp (
val t: Long,
val i: Int
)
data class MyChangesetMessageClusterTime (
@JsonProperty("\$timestamp")
val timestamp: MyChangesetMessageTimestamp
)
data class MyChangesetOid (
@JsonProperty("\$oid")
val oid: String
)
data class MyChangesetMessageFullDocument (
@JsonProperty("_id")
val id: MyChangesetOid,
val uuid: String,
val endingNumber: Int
)
data class MyChangesetMessageNS (
val db: String,
val coll: String
)
data class MyChangesetDocumentKey (
@JsonProperty("_id")
val id: MyChangesetOid
)
data class MyChangesetMessage (
@JsonProperty("_id")
val id: MyChangesetMessageId,
val operationType: String,
val clusterTime: MyChangesetMessageClusterTime,
val fullDocument: MyChangesetMessageFullDocument,
val ns: MyChangesetMessageNS,
val documentKey: MyChangesetDocumentKey
)
...
val objectMapper = jacksonObjectMapper()
val changesetMessage = objectMapper.readValue(message, MyChangesetMessage::class.java)
感谢任何想法。
【问题讨论】:
你使用什么转换器类? 不管默认值是什么,看起来像output.json.formatter=com.mongodb.kafka.connect.source.json.formatter.DefaultJson
。现在用com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson
试试,因为我不知道还有什么可以尝试的。根据docs.mongodb.com/kafka-connector/current/kafka-source,默认值也是output.format.key=json
和output.format.value=json
我问的是 value.converter
不是格式化程序
@OneCricketeer 源连接器似乎没有value.converter
属性。 Sink 连接器可以,但按原样工作正常。我能找到的最接近 Source 的是 output.format.key
和 output.format.value
属性,它们可以是 json
、bson
或 schema
。 value.converter=io.confluent.connect.avro.AvroConverter
因为我想我不能用 Source 连接器覆盖它?
@OneCricketeer 我将其切换为StringConverter
并删除了魔术字符。谢谢你的主意。如果你想回答我可以接受。
【参考方案1】:
您所指的字符通常与已解码为字符串的 Avro 序列化数据常见。
检查 Connect 工作程序中的键/值转换器设置,因为您尚未在连接器中定义它。
如果您想解析为 JSON,请使用 JSONConverter,否则如果您想跳过数据类定义并从 Avro 模式生成它,Avro 也可以工作
【讨论】:
以上是关于Kafka 消息包含控制字符(MongoDB 源连接器)的主要内容,如果未能解决你的问题,请参考以下文章
在 kafka 控制台上无法输入大小超过 4095 个字符的消息