在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器
Posted
技术标签:
【中文标题】在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器【英文标题】:use kafka connect mongoDB debezium souce connector on a remote MSK kafka cluster 【发布时间】:2020-05-17 04:09:57 【问题描述】:我想从 MongoDB 中读取数据到 Kafka 的主题中。我设法通过使用以下连接器属性文件在本地完成这项工作:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
连接工作人员具有以下配置:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=localhost:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092
这在我本地的 kafka 中完美运行。我想在远程 MSK Kafka 集群上运行它。 由于 kafka MSK 中没有对新的 kafka connect 插件的内置支持,我面临着使我的 kafka connect 源 mongo 插件工作的困难,要从我的本地机器导出连接器,我带来了以下修改: 在连接器属性级别:
name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017 #keeping the same local mongo
database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1
在连接工作者层面,我带来了以下修改:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181
rest.port=18083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092
但似乎这还不够,因为我收到以下错误:
[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION (org.apache.kafka.clients.NetworkClient:1031)
通常,我会设法从我的本地机器请求 Kafka MSK 集群(通过使用 ***,并穿梭到 EC2 实例)。例如,列出远程 kafka msk 集群中的主题。我只需要这样做:
bin/kafka-topics.sh --list --zookeeper remote-zookeeper-server:2181
通过转到我本地的 kafka 安装文件夹。
并且这个命令完美地工作,无需更改我本地机器中的 server.properties。知道如何解决这个问题,以便将 kafka Debezium mongo Source 导出到 kafka MSK 集群。
【问题讨论】:
错字?您将端口两次放入引导服务器。此外,Mongo 配置历史引导程序中的值不是您的 MSK 集群 而且 Kafka Connect 不使用 zookeeper.connect... 你在哪里找到的那个属性? 【参考方案1】:建议使用连接分布式脚本和属性来运行 Connect/Debezium
任何说 zookeeper.connect 的东西都应该被删除(只有 Kafka 代理使用它)。任何说引导服务器都应该指向 MSK 给你的地址。
如果您遇到连接错误,请确保检查防火墙/VPC 设置
【讨论】:
那行得通我实际上不得不更改主题级别的conf(启用主题创建),非常感谢 建议提前自己创建主题 实际上我们希望在添加新集合时自动创建主题。连接器创建topis的缺点是什么? 它们使用代理定义的默认分区号和保留设置。以上是关于在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器的主要内容,如果未能解决你的问题,请参考以下文章
将 Lambda 连接到 AWS MSK 中的 kafka 主题的最经济有效的方法是啥?