我们可以在 mongodb 中更新/更新记录吗?数据源是kafka
Posted
技术标签:
【中文标题】我们可以在 mongodb 中更新/更新记录吗?数据源是kafka【英文标题】:Can we update/Upsert a record in mongodb? data source is kafka 【发布时间】:2020-04-02 08:09:29 【问题描述】:我们可以在 mongodb 中更新/更新记录,但是是否有任何方法或函数可以直接在 mongodb 中更新或更新文档,并且源系统是 kafka,目标是 mongodb。
【问题讨论】:
抱歉没有关注。在从 kafka 进程读取为 update/upsert 之后直接更新文档?你能扩展一下吗?可能会添加一些代码? 【参考方案1】:是的,我们可以更新/更新数据。 对于更新,您必须在 Kafka 连接器中定义一个参数。 并将要更新记录的列列入白名单。属性如下:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
value.projection.list=tokenNumber
value.projection.type=whitelist
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
【讨论】:
有效!太糟糕了,文档太难理解了(没有足够的例子,或者只是花了太多时间才能正确理解)......属性的名称在我看来只是令人困惑并且违反直觉我看到很多开发人员很难设置这个了。 是的兄弟,你说得对,文档太难一口气理解了。 @nixxo_raa 您显示的配置中的 parameter 和 column 是什么?【参考方案2】:我一直在苦苦挣扎,最后我得到了答案。我使用了以下 Mongodb sink connector
我在他们的文档上苦恼了一段时间后,终于找到了解决方案。
这是我正在使用的确切 mongodb sink 连接器配置
"name": "mongodbsync",
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "alpha-foobar",
"mongodb.connection.uri": "mongodb://localhost:27017/kafkaconnect?w=1&journal=true",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy"
我在我的配置中将mongodb.writemodel.strategy
留空,所以它采用默认配置
我使用了来自同一连接器的 github 的以下文档的用例 2
我正在处理这种情况,将带有kafka-jdbc-source connect
的mysql表数据传输到mongodb sink
。
上述策略也可以在official docs 中找到 如果您有任何疑问,请随时提出。谢谢
【讨论】:
您建议的这种方法不能完全工作我收到一个名为 org.apache.kafka.connect.errors.DataException: 无法将密钥456
转换为 BsonDocument 的错误。\n \tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146) \n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument.clone(SinkDocument.java:45)\n\tat
你必须确保你有在 kafka 中激活的序列化程序
org.apache.kafka.connect.errors.DataException: 无法将密钥 456
转换为 BsonDocument。\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped (LazyBsonDocument.java:157)\n\tat com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.clone(LazyBsonDocument.java:146)\n\tat com.mongodb.kafka.connect.sink.converter.SinkDocument .clone(SinkDocument.java:45)\n\tat 这是我得到@Shubh的错误
让我分享我的连接器,以便您更好地理解
"config": "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "connection.uri":"mongodb: //xx.xx.xx:27017", "database":"topic1", "collection":"tag", "topics":"tag_update_new1", "key.converter": "org.apache.kafka.connect. storage.StringConverter", "value.converter":"io.confluent.connect.json.JsonSchemaConverter", "value.converter.schema.registry.url": "xx.xxx.xx.xx:8081",以上是关于我们可以在 mongodb 中更新/更新记录吗?数据源是kafka的主要内容,如果未能解决你的问题,请参考以下文章
使用 python 快速有效地更新数百万个 MongoDB 文档的技巧?
使用 Mongoose 在 MongoDB 中更新许多记录的正确方法是啥