Kafka Mongodb 接收器连接器 - 更新文档

Posted

技术标签:

【中文标题】Kafka Mongodb 接收器连接器 - 更新文档【英文标题】:Kafka Mongodb sink connector - update document 【发布时间】:2020-03-23 18:58:21 【问题描述】:

我们一直致力于开发 kafka 生态系统。让我顺其自然

Source(SQLServer) -> Debezium(CDC) -> Kafka Broker -> Kafka Stream(处理、连接等)-> Mongo 连接器 -> Mongo DB

现在我们进入最后一步,我们正在将处理后的数据插入 mongo dB,但现在我们需要更新数据而不是插入。

我们能否从 mongo sink 连接器获得 upsert(插入/更新)功能。至于现在我明白这是不可能的。

【问题讨论】:

您是否尝试过查看该连接器的 github 问题? 是的,我已经尝试并且仍在尝试。 您能说明一下您使用的是哪个 Mongo 接收器连接器吗? (我不知道有任何支持更新或删除),但只是好奇 我正在使用 com.mongodb.kafka.connect.MongoSinkConnector 连接器类 如果你想要更新和/或删除,你必须在这一行周围添加逻辑来相应地处理github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/… 【参考方案1】:

请点击提供的链接,它包含有关 kafka mongo 连接器的所有信息。我已经成功实现了 upsert 功能。您只需要仔细阅读此文档即可。

Kafka Connector - Mongodb

【讨论】:

我也刚刚发布了一个答案。-***.com/questions/59245208/… @Shubh 线程上接受的答案对我有用...【参考方案2】:

实际上这是一个 upsert,如果 $uniqueFieldToUpdateOn 不在 mongo 中,我们要插入,或者如果它存在则更新如下。

根据您的用例更新/替换,有两种主要方法可以对集合中的数据更改进行建模,如下所述:

更新

以下配置状态:

    更新 $uniqueFieldToUpdateOn 为您要建模更新的记录所特有的字段。 AllowList(白名单)此字段与PartialValueStrategy 一起使用允许为 id 策略投影自定义值字段。 UpdateOneBusinessKeyTimestampStrategy 意味着只有上面声明的唯一字段引用的一个文档将被更新(最新时间戳获胜)。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", 
"document.id.strategy.partial.value.projection.list":"$uniqueFieldToUpdateOn",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy" 

替换

注意这个模型是替换而不是更新,但可能仍然有用

以下配置状态:

    将 $uniqueFieldToUpdateOn 替换为您要为其建模的记录所特有的字段。 AllowList(白名单)此字段与PartialValueStrategy 一起使用允许为 id 策略投影自定义值字段。 ReplaceOneBusinessKeyStrategy 表示仅替换上面声明的唯一字段引用的一个文档。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy", 
"document.id.strategy.partial.value.projection.list":"$uniqueFieldToUpdateOn",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"

【讨论】:

我使用了建议的相同配置,但我无法成功地使连接器运行失败我收到错误org.apache.kafka.connect.errors.DataException: Could not convert key 456 into a BsonDocument.\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146)\n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)\n\tat 是否有一个示例连接器可以用作参考 你能告诉我你的连接器配置,也许我可以帮助你吗? 因为我不能一次性发布完整的连接器,所以我把它分成部分第一部分是 ** "name":"tag-update2","config":"connector.class" :"com.mongodb.kafka.connect.MongoSinkConnector","tasks.max":"1","connection.uri":"mongodb://xx.xx.xx:27017","database":"staff" ,"collection":"user","topics":"user_update_new1","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"io.confluent.connect. json.JsonSchemaConverter" ** 另一半是 "value.converter.schema.registry.url": "xx.xxx.xx.xx:8081", "document.id.strategy":"com.mongodb.kafka.connect. sink.processor.id.strategy.PartialValueStrategy", "document.id.strategy.partial.value.projection.type":"user_id", "document.id.strategy.partial.value.projection.type":"AllowList" , "writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy" 抱歉无法将其作为一个整体发送 首先你有 document.id.strategy.partial.value.projection.type 两次,你应该有 document.id.strategy.partial.value.projection.list 和“user_id”,也你能告诉我你正在发送的消息的有效负载,包括密钥吗?看起来像转换密钥的问题

以上是关于Kafka Mongodb 接收器连接器 - 更新文档的主要内容,如果未能解决你的问题,请参考以下文章

将 Kafka 连接 MongoDB 连接器注册为 SCDF 中的应用程序

Kafka mongo db源连接器不起作用

如何为在 kubernetes 集群上运行的 Kafka Connect 配置 MongoDB 官方源连接器

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?

Debezium Kafka 连接器 mongodb