在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器

Posted

技术标签:

【中文标题】在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器【英文标题】:use kafka connect mongoDB debezium souce connector on a remote MSK kafka cluster 【发布时间】:2020-05-17 04:09:57 【问题描述】:

我想从 MongoDB 中读取数据到 Kafka 的主题中。我设法通过使用以下连接器属性文件在本地完成这项工作:

name=mongodb-source-connectorszes
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=test/localhost:27017
database.history.kafka.bootstrap.servers=kafka:9092
mongodb.name=mongo_conn
database.whitelist=test
initial.sync.max.threads=1
tasks.max=1

连接工作人员具有以下配置:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=localhost:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=localhost:9092

这在我本地的 kafka 中完美运行。我想在远程 MSK Kafka 集群上运行它。 由于 kafka MSK 中没有对新的 kafka connect 插件的内置支持,我面临着使我的 kafka connect 源 mongo 插件工作的困难,要从我的本地机器导出连接器,我带来了以下修改: 在连接器属性级别:

name=mongodb-source-connectorszes
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=test/localhost:27017  #keeping the same local mongo
    database.history.kafka.bootstrap.servers=remote-msk-kakfa-brokers:9092
    mongodb.name=mongo_conn
    database.whitelist=test
    initial.sync.max.threads=1
    tasks.max=1

在连接工作者层面,我带来了以下修改:

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000


zookeeper.connect=remote-msk-kakfa-zookeeper:9092:2181

rest.port=18083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/share/java/test

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
bootstrap.servers=remote-msk-kakfa-brokers:9092:9092

但似乎这还不够,因为我收到以下错误:

[2020-01-31 11:58:01,619] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 118 : mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION (org.apache.kafka.clients.NetworkClient:1031)
[2020-01-31 11:58:01,731] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 119 : mongo_conn.test.docs=UNKNOWN_TOPIC_OR_PARTITION (org.apache.kafka.clients.NetworkClient:1031)

通常,我会设法从我的本地机器请求 Kafka MSK 集群(通过使用 ***,并穿梭到 EC2 实例)。例如,列出远程 kafka msk 集群中的主题。我只需要这样做:

bin/kafka-topics.sh --list --zookeeper  remote-zookeeper-server:2181

通过转到我本地的 kafka 安装文件夹。

并且这个命令完美地工作,无需更改我本地机器中的 server.properties。知道如何解决这个问题,以便将 kafka Debezium mongo Source 导出到 kafka MSK 集群。

【问题讨论】:

错字?您将端口两次放入引导服务器。此外,Mongo 配置历史引导程序中的值不是您的 MSK 集群 而且 Kafka Connect 不使用 zookeeper.connect... 你在哪里找到的那​​个属性? 【参考方案1】:

建议使用连接分布式脚本和属性来运行 Connect/Debezium

任何说 zookeeper.connect 的东西都应该被删除(只有 Kafka 代理使用它)。任何说引导服务器都应该指向 MSK 给你的地址。

如果您遇到连接错误,请确保检查防火墙/VPC 设置

【讨论】:

那行得通我实际上不得不更改主题级别的conf(启用主题创建),非常感谢 建议提前自己创建主题 实际上我们希望在添加新集合时自动创建主题。连接器创建topis的缺点是什么? 它们使用代理定义的默认分区号和保留设置。

以上是关于在远程 MSK kafka 集群上使用 kafka 连接 mongoDB debezium 源连接器的主要内容,如果未能解决你的问题,请参考以下文章

将 Lambda 连接到 AWS MSK 中的 kafka 主题的最经济有效的方法是啥?

AWS Glue集成Kafka/MSK

AWS Glue集成Kafka/MSK

Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录

Kafka Connect 与 Amazon MSK

Amazon Kinesis 与 AWS Manage Service Kafka (MSK) -(从本地连接)