Kafka Connect - MongoDB 源连接器 - 管道不工作

Posted

技术标签:

【中文标题】Kafka Connect - MongoDB 源连接器 - 管道不工作【英文标题】:Kafka Connect - MongoDB Source Connector - Pipeline Not Working 【发布时间】:2021-11-23 03:38:46 【问题描述】:

我正在使用 MongoDB 源连接器设置 Kafka 连接器。

配置如下:


  "name": "MongoSourceConn",
  "config": 
    "name": "MongoSourceConn",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "value.converter.schema.registry.url":"http://schema-registry:8081",
    "publish.full.document.only": true,
    "topics": "test_topic",
    "connection.uri": "mongodb://siteUserAdmin:rstatools@rsgadcmgo5:27017",
    "database": "kafka",
    "collection": "test_topic",
    "pipeline": "[ \"$match\":  \"$and\": [ \"operationType\":  \"$in\": [ \"update\",\"insert\" ], \"jobStatus\": \"$eq\": 5 ]  ]"

    "transforms":"dropPrefix",
    "transforms.dropPrefix.regex":"kafka.test_topic",
    "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.replacement":"test_topic"

如果我删除“管道”行,源连接器工作正常,但显然所有文档都会被推送到主题,这不是我想要的。

如果我添加回“管道”行,源连接器不会将任何消息推送到我的主题,我不明白为什么。 我错过了什么? 以下是我们 mongo 中的文档的样子:


    "_id" : ObjectId("61570b1d21589e03f8011235"),
    "jobId" : "04bba49d-098b-4d4c-adde-4578d31f20df",
    "jobStatus" : 5,
    "data" : null,
    "createdOn" : "2021-10-01 13:20:29.215691"

配置是通过 rest api 推送的,所以这就是为什么它具有带有所有转义字符 (\") 的“字典”外观。

谢谢。

【问题讨论】:

有了pipeline 配置,连接器的状态是什么?是'RUNNING吗? Kafka Connect 工作器日志中是否有任何错误或警告? 在确认创建连接器后,控制台正在向这些垃圾邮件发送这些垃圾邮件:[2021-10-01 14:22:48,064] INFO WorkerSourceTaskid=MongoSourceCn_-0 正在刷新 0 条未完成的消息用于偏移提交(org.apache.kafka.connect.runtime.WorkerSourceTask) 从日志中我觉得还不错,我认为问题正在酝酿中 顺便说一下状态正在运行。 您的文档没有operationType 字段,那么您希望匹配什么?显然,管道有效,因为您排除了所有没有 operationType"update""insert" 的事件 【参考方案1】:

很明显,这条管道永远不会匹配,因为它当前包含\"operationType\": \"$in\": [ \"update\",\"insert\" ]

你提到你删除了它,但没有看到更多内容,不可能确切知道你是如何删除它的,所以那里可能出了点问题。

此外,您还不清楚获得数据后的确切外观。您在 Mongo 中显示一条消息,但可能会被包装到其他内容中(例如,由于变更流),因此字段 jobStatus 可能在顶层不可用,但最终嵌套。

我会推荐以下步骤:

    在没有管道的情况下检查数据在 kafka 中的外观 从只做一件事的最简单的管道开始 在您能够以某种方式使用管道之前一直使用它 然后继续扩展逻辑直到你回到你想要的状态

我知道这些步骤有点笼统,但加上上面指出的内容,希望就足够了。

【讨论】:

以上是关于Kafka Connect - MongoDB 源连接器 - 管道不工作的主要内容,如果未能解决你的问题,请参考以下文章

kafka connect - 审计 - 在任务完成时触发事件

kafka connect debezium mongodb连接器无法加载

Debezium MongoDB 连接器错误:org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出公差

在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器

Kafka 消息包含控制字符(MongoDB 源连接器)

在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?