覆盖 Mongo 源连接器

Posted

技术标签:

【中文标题】覆盖 Mongo 源连接器【英文标题】:Overriding Mongo Source connector 【发布时间】:2021-10-08 09:16:44 【问题描述】:

我打算使用 Kafka 连接来消费来自 MongoDB 的消息并发布到 Kafka 主题中。

默认情况下,Mongo 源连接器为每个集合创建一个主题。但是我会有很多收藏,并且希望所有收藏只有一个主题。消息将具有集合名称。

    覆盖 mongo-source 连接器是更好的方法吗?如果是这样,我应该牢记什么 是否已经提供了任何设置?我知道在创建时将collection 指定为空会监听所有集合。但它会为每个集合创建一个主题。

【问题讨论】:

我相信你可以使用RegexRouter这个 感谢@OneCricketeer,RegexRouter 帮助了我。 酷。随意把你的答案放在下面 【参考方案1】:

正如@onecricketeer 建议的那样,我使用了RegexRouter。因此无需重写 mongo 源连接器即可将所有集合中的文档发布到同一主题中。

这是我的配置,它监听所有匹配管道的集合并发布到mongodbTopic


"name": "mongo-source",
"config": 
    "tasks.max": "1",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "connection.uri": "<connection_uri>",
    "topic.prefix": "",
    "pipeline": "[\"$match\":  \"$or\": [\"fullDocument._kind\":\"collection\",\"fullDocument._isInverse\":false],\"ns.coll\": \"$regex\": /^(.*_related)$|^(my_collection_test)$/]",
    "poll.await.time.ms": 5,
    "poll.max.batch.size": 2000,
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": ".*",
    "transforms.dropPrefix.replacement": "mongodbTopic",
    "errors.tolerance": "none",
    "copy.existing": true
    

更多信息 https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html

【讨论】:

以上是关于覆盖 Mongo 源连接器的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消息包含控制字符(MongoDB 源连接器)

等效于 mongo 的 out:reduce 选项在 hadoop

如何使用适用于 Spark 的 Mongo-Hadoop 连接器删除文档(记录)

mongo连接数满问题处理

sh Mongo SQL连接器 - Ubuntu 18.04

python连接Mongo数据库