如何在 Docker 中将 Debezium 连接到 MongoDB?
Posted
技术标签:
【中文标题】如何在 Docker 中将 Debezium 连接到 MongoDB?【英文标题】:How to connect Debezium to MongoDB in Docker? 【发布时间】:2021-08-01 05:16:06 【问题描述】:我正在使用 Debezium 将更改从 MongoDB 流式传输到 Kafka,当 Debezium 尝试访问 MongoDB 副本集时遇到了某种权限问题。
docker-compose.yml
version: "3"
services:
mongodb:
image: mongo
container_name: mongodb
hostname: mongodb
restart: always
ports:
- 27017:27017
volumes:
- ./mongodb/data/admin:/data/admin
- ./mongodb/data/keyfile:/data/keyfile
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
command: --bind_ip_all --keyFile /data/keyfile/mongodb-keyfile --replSet rs0
zookeeper:
image: confluentinc/cp-zookeeper:6.1.1
hostname: zookeeper
container_name: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-server:6.1.1
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 9101:9101
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous
schema-registry:
image: confluentinc/cp-schema-registry:6.1.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: tcgplayer/cp-kafka-connect:6.1.1-debezium-mongodb-1.5.0
build:
context: ./connect
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- kafka
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:6.1.1
hostname: control-center
container_name: control-center
depends_on:
- kafka
- schema-registry
- connect
ports:
- 9021:9021
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:29092
CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
所有容器都可以正常启动。我可以使用 Compass 和管理员帐户连接到 MongoDB。我还可以通过 mongo CLI 初始化副本集。
码头工人执行
docker exec mongodb bash -c 'mongo -u admin -p admin < /data/admin/replica.js'
replica.js
config =
"_id": "rs0",
"members": [
"_id": 0,
"host": "mongodb:27017"
]
rs.initiate(config)
但是在我创建 Debezium 连接器之后,它在尝试读取副本集时会引发异常。
POST 以连接 REST API
curl -X POST -H "Content-Type: application/json" -d @debezium-mongodb.json http://localhost:8083/connectors
debezium-mongodb.json
"name": "debezium-mongodb",
"config":
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.name": "mongodb",
"mongodb.hosts": "rs0/mongodb:27017",
"mongodb.user": "admin",
"mongodb.password": "admin"
异常
[2021-05-10 19:04:21,453] ERROR Error while attempting to sync 'rs0.config.system.sessions': (io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource)
com.mongodb.MongoQueryException: Query failed with error code 13 and error message 'not authorized on config to execute command find: "system.sessions", filter: , $db: "config", $clusterTime: clusterTime: Timestamp(1620673459, 1), signature: hash: BinData(0, 9B0A6379ED7BA2DF94F20FCA8AAE76877067E3D6), keyId: 6960734006342057988 , lsid: id: UUID("e8418d9a-0f05-4c8c-9c34-a8a806183fe4") , $readPreference: mode: "primaryPreferred" ' on server mongodb:27017
【问题讨论】:
【参考方案1】:这个问题出现是因为 mongodb 用户没有从配置数据库读取数据所需的权限。
从 mongodb oplog 中读取数据: “您还必须有一个具有适当角色的 MongoDB 用户来读取可以读取 oplog 的管理数据库。此外,该用户还必须能够读取分片集群的配置服务器中的配置数据库,并且必须具有 listDatabases特权操作。”
从官方 Debezium 文档中引用它
【讨论】:
以上是关于如何在 Docker 中将 Debezium 连接到 MongoDB?的主要内容,如果未能解决你的问题,请参考以下文章
在 Debezium Mysql 连接器中将更多表列入白名单的有效方法
如何使用Debezium从MS SQL中将250张表导入Kafka
如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?
Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝