Confluent 连接无法使用 MongoDB 创建接收器

Posted

技术标签:

【中文标题】Confluent 连接无法使用 MongoDB 创建接收器【英文标题】:Confluent connect unble to create sink using MongoDB 【发布时间】:2021-08-25 10:32:37 【问题描述】:

我正在实现一个 Kafka MongoDB 连接器来接收 MongoDB 中的数据。使用我配置了连接器的文档。但无法连接它。我从这样的连接中获取日志。所有组件都在 docker 中,我的数据库安装在本地机器上。

[2021-06-08 12:14:14,232] INFO Cluster created with settings hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms' (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:14,262] INFO MongoSinkTopicConfig values:
        change.data.capture.handler =
        collection = test
        database = admin
        delete.on.null.values = false
        document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
        document.id.strategy.overwrite.existing = false
        document.id.strategy.partial.key.projection.list =
        document.id.strategy.partial.key.projection.type =
        document.id.strategy.partial.value.projection.list =
        document.id.strategy.partial.value.projection.type =
        document.id.strategy.uuid.format = string
        errors.log.enable = false
        errors.tolerance = none
        field.renamer.mapping = []
        field.renamer.regexp = []
        key.projection.list =
        key.projection.type = none
        max.batch.size = 0
        max.num.retries = 1
        namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
        namespace.mapper.error.if.invalid = false
        namespace.mapper.key.collection.field =
        namespace.mapper.key.database.field =
        namespace.mapper.value.collection.field =
        namespace.mapper.value.database.field =
        post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
        rate.limiting.every.n = 0
        rate.limiting.timeout = 0
        retries.defer.timeout = 5000
        topic = test
        value.projection.list =
        value.projection.type = none
        writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
 (com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,265] INFO MongoSinkTopicConfig values:
        change.data.capture.handler =
        collection = test
        database = admin
        delete.on.null.values = false
        document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
        document.id.strategy.overwrite.existing = false
        document.id.strategy.partial.key.projection.list =
        document.id.strategy.partial.key.projection.type =
        document.id.strategy.partial.value.projection.list =
        document.id.strategy.partial.value.projection.type =
        document.id.strategy.uuid.format = string
        errors.log.enable = false
        errors.tolerance = none
        field.renamer.mapping = []
        field.renamer.regexp = []
        key.projection.list =
        key.projection.type = none
        max.batch.size = 0
        max.num.retries = 1
        namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
        namespace.mapper.error.if.invalid = false
        namespace.mapper.key.collection.field =
        namespace.mapper.key.database.field =
        namespace.mapper.value.collection.field =
        namespace.mapper.value.database.field =
        post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
        rate.limiting.every.n = 0
        rate.limiting.timeout = 0
        retries.defer.timeout = 5000
        topic = test
        value.projection.list =
        value.projection.type = none
        writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
 (com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,275] INFO Cluster created with settings hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms' (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:16,269] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
        at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
        at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
        at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
        at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.base/java.net.Socket.connect(Socket.java:609)
        at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
        at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
        at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
        ... 4 more
[2021-06-08 12:14:16,316] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
        at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
        at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
        at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
        at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.base/java.net.Socket.connect(Socket.java:609)
        at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
        at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
        at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
        ... 4 more

我检查了 docker image 是否可以 ping 我的本地 IP。我还验证了我的 MongoDB 已启动并正在运行。

谁能帮我解决这个问题?任何帮助表示赞赏。谢谢。

【问题讨论】:

您是否尝试过使用容器中的 mongo Shell 连接 MongoDB?请分享您的连接器配置。 【参考方案1】:

问题的发生只是因为我用来连接我的 MongoDB 的 IP 不同。所以默认情况下 127.0.0.1 被列入白名单,而不是我们必须列入白名单。所以使用 mongo 配置中的bindIp 属性我们可以实现这一点。

net:
  port: 27017
  bindIp: 127.0.0.1,0.0.0.0

注意:这里我已将所有 ip 列入白名单。您可以根据需要加入白名单。在生产环境中,不建议使用通配符白名单。更改 MongoDB 配置文件中的属性后重新启动它。

【讨论】:

以上是关于Confluent 连接无法使用 MongoDB 创建接收器的主要内容,如果未能解决你的问题,请参考以下文章

kafka connect debezium mongodb连接器无法加载

在 docker 上添加 MongoDB Sink 连接器?

Kafka 消息包含控制字符(MongoDB 源连接器)

在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?

使用带有 Helm 安装的 Kafka/Confluent 的连接器

无法读取 Kafka 主题 avro 消息