Confluent 连接无法使用 MongoDB 创建接收器
Posted
技术标签:
【中文标题】Confluent 连接无法使用 MongoDB 创建接收器【英文标题】:Confluent connect unble to create sink using MongoDB 【发布时间】:2021-08-25 10:32:37 【问题描述】:我正在实现一个 Kafka MongoDB 连接器来接收 MongoDB 中的数据。使用我配置了连接器的文档。但无法连接它。我从这样的连接中获取日志。所有组件都在 docker 中,我的数据库安装在本地机器上。
[2021-06-08 12:14:14,232] INFO Cluster created with settings hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms' (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:14,262] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,265] INFO MongoSinkTopicConfig values:
change.data.capture.handler =
collection = test
database = admin
delete.on.null.values = false
document.id.strategy = com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
document.id.strategy.overwrite.existing = false
document.id.strategy.partial.key.projection.list =
document.id.strategy.partial.key.projection.type =
document.id.strategy.partial.value.projection.list =
document.id.strategy.partial.value.projection.type =
document.id.strategy.uuid.format = string
errors.log.enable = false
errors.tolerance = none
field.renamer.mapping = []
field.renamer.regexp = []
key.projection.list =
key.projection.type = none
max.batch.size = 0
max.num.retries = 1
namespace.mapper = com.mongodb.kafka.connect.sink.namespace.mapping.DefaultNamespaceMapper
namespace.mapper.error.if.invalid = false
namespace.mapper.key.collection.field =
namespace.mapper.key.database.field =
namespace.mapper.value.collection.field =
namespace.mapper.value.database.field =
post.processor.chain = [com.mongodb.kafka.connect.sink.processor.DocumentIdAdder]
rate.limiting.every.n = 0
rate.limiting.timeout = 0
retries.defer.timeout = 5000
topic = test
value.projection.list =
value.projection.type = none
writemodel.strategy = com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
(com.mongodb.kafka.connect.sink.MongoSinkTopicConfig:361)
[2021-06-08 12:14:14,275] INFO Cluster created with settings hosts=[192.168.1.23:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms' (org.mongodb.driver.cluster:71)
[2021-06-08 12:14:16,269] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
[2021-06-08 12:14:16,316] INFO Exception in monitor thread while connecting to server 192.168.1.23:27017 (org.mongodb.driver.cluster:76)
com.mongodb.MongoSocketOpenException: Exception opening socket
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:70)
at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:143)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:188)
at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:144)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at com.mongodb.internal.connection.SocketStreamHelper.initialize(SocketStreamHelper.java:107)
at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:79)
at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
... 4 more
我检查了 docker image 是否可以 ping 我的本地 IP。我还验证了我的 MongoDB 已启动并正在运行。
谁能帮我解决这个问题?任何帮助表示赞赏。谢谢。
【问题讨论】:
您是否尝试过使用容器中的 mongo Shell 连接 MongoDB?请分享您的连接器配置。 【参考方案1】:问题的发生只是因为我用来连接我的 MongoDB 的 IP 不同。所以默认情况下 127.0.0.1 被列入白名单,而不是我们必须列入白名单。所以使用 mongo 配置中的bindIp
属性我们可以实现这一点。
net:
port: 27017
bindIp: 127.0.0.1,0.0.0.0
注意:这里我已将所有 ip 列入白名单。您可以根据需要加入白名单。在生产环境中,不建议使用通配符白名单。更改 MongoDB 配置文件中的属性后重新启动它。
【讨论】:
以上是关于Confluent 连接无法使用 MongoDB 创建接收器的主要内容,如果未能解决你的问题,请参考以下文章
kafka connect debezium mongodb连接器无法加载
在 docker 上添加 MongoDB Sink 连接器?
在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?