在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?

Posted

技术标签:

【中文标题】在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?【英文标题】:How to get full document when using kafka mongodb source connector when tracking update operations on a collection? 【发布时间】:2020-11-07 23:53:11 【问题描述】:

我正在将 Kakfa MongoDB 源连接器 [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb] 与 confluent 平台 v5.4 一起使用。下面是我的 MongoDB 源连接器配置


    "name": "mongodb-replica-set-connector",
    "config": 
        "tasks.max": 1,
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.uri": "mongodb://<username:password>@<MongoDB-Server-IP-Or-DNS>/<DB-Name>?ssl=false&authSource=<DB-Name>&retryWrites=true&w=majority",
        "database": "<DB-Name>",
        "collection": "<Collection-Name>",
        "topic.prefix": ""
    

在将记录插入指定集合时,我会获得完整且正确的文档详细信息。但是当我执行删除或更新操作时,我没有得到完整的文档。下面是从读取配置中指定的主题的流中删除和更新操作的屏幕截图。 我的问题是 - 我应该在配置中指定什么以便在执行更新操作时获得完整的文档?有什么方法可以获取被删除文档的 id 或 key 等信息?

【问题讨论】:

【参考方案1】:

使用 MongoDB 连接器配置中的属性 publish.full.document.only": "true" 获取完整文档,以防在 MongoDB 集合上执行任何创建和更新操作。无法跟踪删除操作,因为它不符合 CDC(更改数据捕获)概念的概念。只能捕获数据中的更改(创建/更新)。

【讨论】:

【参考方案2】:

您可以在 Kafka 连接器中将 change.stream.full.document 参数设置为 updateLookup,以获得用于插入和更新的 fullDocument 字段。

也使用这种方法发送删除,但fullDocument 字段将为空。

来源:https://docs.mongodb.com/kafka-connector/current/source-connector/configuration-properties/change-stream/#std-label-source-configuration-change-stream

【讨论】:

以上是关于在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?的主要内容,如果未能解决你的问题,请参考以下文章

跟踪用户在 App Store 上的新下载和更新

如何在Rails应用程序中使用Kafka?

我们如何从带有 debezium kafka 连接器的副本集中的辅助 mongodb 节点跟踪 oplog?

下载 net.cakesolutions:scala-kafka-client 时出错 - 未找到

如何在 asp.net mvc 中创建审计日志/审计跟踪

Firestore / Cloud功能 - 如何在更改时更新Firestore数据