使用 mongodb-source-connect 时出现“仅在副本集上支持 $changeStream 阶段”错误

Posted

技术标签:

【中文标题】使用 mongodb-source-connect 时出现“仅在副本集上支持 $changeStream 阶段”错误【英文标题】:"The $changeStream stage is only supported on replica sets" error while using mongodb-source-connect 【发布时间】:2020-04-21 15:02:07 【问题描述】:

新年快乐

我在这里是因为我在运行 kafka-mongodb-source-connect 时遇到错误 我试图用 connect-avro-standalone.propertiesMongoSourceConnector.properties 运行 connect-standalone 以便 Connect 写入写入的数据mongodb 到 kafka 主题。

在尝试取得进展时,我遇到了这个错误,我找不到答案,所以我写在这里。

这就是我想做的事

bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties share/confluent-hub-components/mongodb-kafka-connect-mongodb/etc/MongoSourceConnector.properties

这是 connect-avro-standalone.properties

# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=share/java,/Users/anton/Downloads/confluent-5.3.2/share/confluent-hub-components

这是 MongoSourceConnecor.properties

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://localhost:27017
database=test
collection=test

这是我遇到的主要错误

[2020-01-02 18:55:11,546] ERROR WorkerSourceTaskid=mongo-source-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
com.mongodb.MongoCommandException: Command failed with error 40573 (Location40573): 'The $changeStream stage is only supported on replica sets' on server localhost:27017. The full response is "ok": 0.0, "errmsg": "The $changeStream stage is only supported on replica sets", "code": 40573, "codeName": "Location40573"

【问题讨论】:

您是否尝试过使用 Debezium Mongo 连接器? 还没有。我不确定没有副本集的 Mongo 数据库是否与连接器一起使用。我将制作一个副本集,再试一次并检查它是否有效。谢谢。 【参考方案1】:

stage 仅在副本集上受支持

你需要make your Mongo database a replica set才能阅读oplog

https://dba.stackexchange.com/questions/243780/converting-mongodb-instance-from-standalone-to-replica-set-and-backing-up

【讨论】:

啊! .是,我懂了。 mongodb-kafka-source 连接器使用 oplog 在 Kafka 中写入数据,因此副本集 Mongo 数据库是强制性选项。 非常感谢板球 @cricket_007/@Anton:嗨,你在 github 上的某个地方有这个设置吗?我有一个类似的案例要实现,我必须将 MongoDB-ChangeStream 捕获到 Kafka 连接器,然后再捕获到 Kafka Broker。来自 Kafka Broker 的 Java 服务将处理 change-stream 事件。如何安装 MongoDB-Kafka-Source-Connector?【参考方案2】:

MongoDB 更改流选项仅在副本集设置中可用。但是,您可以按照以下步骤将独立安装更新为单节点副本集。

    找到mongodb.conf文件并添加副本集详细信息

将以下副本集详细信息添加到mongodb.conf 文件

replication:
  replSetName: "<replica-set name>"

例子

复制:replSetName:“rs0”

注意:在brew中安装MongoDB的位置/usr/local/etc/mongod.conf

    使用rs.initiate()启动副本集

登录到 MongoDB shell 并运行命令 rs.initiate() 这将启动您的副本集。成功启动后的日志如下所示

> rs.initiate()

    "info2" : "no configuration specified. Using a default configuration for the set",
    "me" : "127.0.0.1:27017",
    "ok" : 1,
    "$clusterTime" : 
        "clusterTime" : Timestamp(1577545731, 1),
        "signature" : 
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        
    ,
    "operationTime" : Timestamp(1577545731, 1)

通过这两个简单的步骤,您就可以运行一个只有一个节点的 MongoDB 副本集。

参考:https://onecompiler.com/posts/3vchuyxuh/enabling-replica-set-in-mongodb-with-just-one-node

【讨论】:

对我来说效果很好【参考方案3】:

这对我的情况有帮助(macOS env),请参阅more:

我。安装零配置 MongoDB 运行器。启动一个没有非节点依赖的副本集,甚至没有 MongoDB。

npm install -g run-rs   // OR yarn global add run-rs

二。使用连接字符串

mongodb://localhost:27017,localhost:27018,localhost:27019/YOUR_DB_NAME?replicaSet=rs&retryWrites=false

【讨论】:

以上是关于使用 mongodb-source-connect 时出现“仅在副本集上支持 $changeStream 阶段”错误的主要内容,如果未能解决你的问题,请参考以下文章

第一篇 用于测试使用

在使用加载数据流步骤的猪中,使用(使用 PigStorage)和不使用它有啥区别?

今目标使用教程 今目标任务使用篇

Qt静态编译时使用OpenSSL有三种方式(不使用,动态使用,静态使用,默认是动态使用)

MySQL db 在按日期排序时使用“使用位置;使用临时;使用文件排序”

使用“使用严格”作为“使用强”的备份