Kafka Connect - MongoDB 源连接器 - 管道不工作
Posted
技术标签:
【中文标题】Kafka Connect - MongoDB 源连接器 - 管道不工作【英文标题】:Kafka Connect - MongoDB Source Connector - Pipeline Not Working 【发布时间】:2021-11-23 03:38:46 【问题描述】:我正在使用 MongoDB 源连接器设置 Kafka 连接器。
配置如下:
"name": "MongoSourceConn",
"config":
"name": "MongoSourceConn",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"value.converter.schema.registry.url":"http://schema-registry:8081",
"publish.full.document.only": true,
"topics": "test_topic",
"connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
"database": "kafka",
"collection": "test_topic",
"pipeline": "[ \"$match\": \"$and\": [ \"operationType\": \"$in\": [ \"update\",\"insert\" ], \"jobStatus\": \"$eq\": 5 ] ]"
"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"kafka.test_topic",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement":"test_topic"
如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。
如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。 我错过了什么? 以下是我们 mongo 中的文档的样子:
"_id" : ObjectId("61570b1d21589e03f8011235"),
"jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
"jobStatus" : 5,
"data" : null,
"createdOn" : "2021-10-01 13:20:29.215691"
配置是通过 rest api 推送的,所以这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。
谢谢。
【问题讨论】:
有了pipeline
配置,连接器的状态是什么?是'RUNNING
吗? Kafka Connect 工作器日志中是否有任何错误或警告?
在确认创建连接器后,控制台正在向这些垃圾邮件发送这些垃圾邮件:[2021-10-01 14:22:48,064] INFO WorkerSourceTaskid=MongoSourceCn_operationType
字段,那么您希望匹配什么?显然,管道有效,因为您排除了所有没有 operationType
或 "update"
或 "insert"
的事件
【参考方案1】:
很明显,这条管道永远不会匹配,因为它当前包含\"operationType\": \"$in\": [ \"update\",\"insert\" ]
你提到你删除了它,但没有看到更多内容,不可能确切知道你是如何删除它的,所以那里可能出了点问题。
此外,您还不清楚获得数据后的确切外观。您在 Mongo 中显示一条消息,但可能会被包装到其他内容中(例如,由于变更流),因此字段 jobStatus 可能在顶层不可用,但最终嵌套。
我会推荐以下步骤:
-
在没有管道的情况下检查数据在 kafka 中的外观
从只做一件事的最简单的管道开始
在您能够以某种方式使用管道之前一直使用它
然后继续扩展逻辑直到你回到你想要的状态
我知道这些步骤有点笼统,但加上上面指出的内容,希望就足够了。
【讨论】:
以上是关于Kafka Connect - MongoDB 源连接器 - 管道不工作的主要内容,如果未能解决你的问题,请参考以下文章
kafka connect - 审计 - 在任务完成时触发事件
kafka connect debezium mongodb连接器无法加载
Debezium MongoDB 连接器错误:org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出公差