Kafka Mongodb 接收器连接器 - 更新文档
Posted
技术标签:
【中文标题】Kafka Mongodb 接收器连接器 - 更新文档【英文标题】:Kafka Mongodb sink connector - update document 【发布时间】:2020-03-23 18:58:21 【问题描述】:我们一直致力于开发 kafka 生态系统。让我顺其自然
Source(SQLServer) -> Debezium(CDC) -> Kafka Broker -> Kafka Stream(处理、连接等)-> Mongo 连接器 -> Mongo DB
现在我们进入最后一步,我们正在将处理后的数据插入 mongo dB,但现在我们需要更新数据而不是插入。
我们能否从 mongo sink 连接器获得 upsert(插入/更新)功能。至于现在我明白这是不可能的。
【问题讨论】:
您是否尝试过查看该连接器的 github 问题? 是的,我已经尝试并且仍在尝试。 您能说明一下您使用的是哪个 Mongo 接收器连接器吗? (我不知道有任何支持更新或删除),但只是好奇 我正在使用 com.mongodb.kafka.connect.MongoSinkConnector 连接器类 如果你想要更新和/或删除,你必须在这一行周围添加逻辑来相应地处理github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/… 【参考方案1】:请点击提供的链接,它包含有关 kafka mongo 连接器的所有信息。我已经成功实现了 upsert 功能。您只需要仔细阅读此文档即可。
Kafka Connector - Mongodb
【讨论】:
我也刚刚发布了一个答案。-***.com/questions/59245208/… @Shubh 线程上接受的答案对我有用...【参考方案2】:实际上这是一个 upsert,如果 $uniqueFieldToUpdateOn 不在 mongo 中,我们要插入,或者如果它存在则更新如下。
根据您的用例更新/替换,有两种主要方法可以对集合中的数据更改进行建模,如下所述:
更新
以下配置状态:
-
更新 $uniqueFieldToUpdateOn 为您要建模更新的记录所特有的字段。
AllowList(白名单)此字段与
PartialValueStrategy
一起使用允许为 id 策略投影自定义值字段。
UpdateOneBusinessKeyTimestampStrategy 意味着只有上面声明的唯一字段引用的一个文档将被更新(最新时间戳获胜)。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list":"$uniqueFieldToUpdateOn",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy"
替换
注意这个模型是替换而不是更新,但可能仍然有用
以下配置状态:
-
将 $uniqueFieldToUpdateOn 替换为您要为其建模的记录所特有的字段。
AllowList(白名单)此字段与
PartialValueStrategy
一起使用允许为 id 策略投影自定义值字段。
ReplaceOneBusinessKeyStrategy 表示仅替换上面声明的唯一字段引用的一个文档。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list":"$uniqueFieldToUpdateOn",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
【讨论】:
我使用了建议的相同配置,但我无法成功地使连接器运行失败我收到错误org.apache.kafka.connect.errors.DataException: Could not convert key 456 into a BsonDocument.\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146)\n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)\n\tat
是否有一个示例连接器可以用作参考
你能告诉我你的连接器配置,也许我可以帮助你吗?
因为我不能一次性发布完整的连接器,所以我把它分成部分第一部分是 ** "name":"tag-update2","config":"connector.class" :"com.mongodb.kafka.connect.MongoSinkConnector","tasks.max":"1","connection.uri":"mongodb://xx.xx.xx:27017","database":"staff" ,"collection":"user","topics":"user_update_new1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"io.confluent.connect. json.JsonSchemaConverter" **
另一半是 "value.converter.schema.registry.url": "xx.xxx.xx.xx:8081", "document.id.strategy":"com.mongodb.kafka.connect. sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.partial.value.projection.type":"user_id", "document.id.strategy.partial.value.projection.type":"AllowList" , "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy" 抱歉无法将其作为一个整体发送
首先你有 document.id.strategy.partial.value.projection.type 两次,你应该有 document.id.strategy.partial.value.projection.list 和“user_id”,也你能告诉我你正在发送的消息的有效负载,包括密钥吗?看起来像转换密钥的问题以上是关于Kafka Mongodb 接收器连接器 - 更新文档的主要内容,如果未能解决你的问题,请参考以下文章
将 Kafka 连接 MongoDB 连接器注册为 SCDF 中的应用程序
如何为在 kubernetes 集群上运行的 Kafka Connect 配置 MongoDB 官方源连接器
无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON