如何在 Docker 中将 Debezium 连接到 MongoDB?

Posted

技术标签:

【中文标题】如何在 Docker 中将 Debezium 连接到 MongoDB?【英文标题】:How to connect Debezium to MongoDB in Docker? 【发布时间】:2021-08-01 05:16:06 【问题描述】:

我正在使用 Debezium 将更改从 MongoDB 流式传输到 Kafka,当 Debezium 尝试访问 MongoDB 副本集时遇到了某种权限问题。

docker-compose.yml

version: "3"
services: 

    mongodb:
        image: mongo
        container_name: mongodb
        hostname: mongodb
        restart: always
        ports:
            - 27017:27017
        volumes:
            - ./mongodb/data/admin:/data/admin
            - ./mongodb/data/keyfile:/data/keyfile
        environment:
            MONGO_INITDB_ROOT_USERNAME: admin
            MONGO_INITDB_ROOT_PASSWORD: admin
        command: --bind_ip_all --keyFile /data/keyfile/mongodb-keyfile --replSet rs0

    zookeeper:
        image: confluentinc/cp-zookeeper:6.1.1
        hostname: zookeeper
        container_name: zookeeper
        ports:
            - 2181:2181
        environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000

    kafka:
        image: confluentinc/cp-server:6.1.1
        hostname: kafka
        container_name: kafka
        depends_on:
            - zookeeper
        ports:
            - 9092:9092
            - 9101:9101
        environment:
            KAFKA_BROKER_ID: 1
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
            KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
            KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
            KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            KAFKA_JMX_PORT: 9101
            KAFKA_JMX_HOSTNAME: localhost
            KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
            CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
            CONFLUENT_METRICS_ENABLE: "true"
            CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous

    schema-registry:
        image: confluentinc/cp-schema-registry:6.1.1
        hostname: schema-registry
        container_name: schema-registry
        depends_on:
            - kafka
        ports:
            - 8081:8081
        environment:
            SCHEMA_REGISTRY_HOST_NAME: schema-registry
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
            SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

    connect:
        image: tcgplayer/cp-kafka-connect:6.1.1-debezium-mongodb-1.5.0
        build:
            context: ./connect
            dockerfile: Dockerfile
        hostname: connect
        container_name: connect
        depends_on:
            - kafka
            - schema-registry
        ports:
            - 8083:8083
        environment:
            CONNECT_BOOTSTRAP_SERVERS: kafka:29092
            CONNECT_REST_ADVERTISED_HOST_NAME: connect
            CONNECT_REST_PORT: 8083
            CONNECT_GROUP_ID: compose-connect-group
            CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
            CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
            CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
            CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.1.1.jar
            CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
            CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
            CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
            CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

    control-center:
        image: confluentinc/cp-enterprise-control-center:6.1.1
        hostname: control-center
        container_name: control-center
        depends_on:
            - kafka
            - schema-registry
            - connect
        ports:
            - 9021:9021
        environment:
            CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:29092
            CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
            CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
            CONTROL_CENTER_REPLICATION_FACTOR: 1
            CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
            CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
            CONFLUENT_METRICS_TOPIC_REPLICATION: 1
            PORT: 9021

所有容器都可以正常启动。我可以使用 Compass 和管理员帐户连接到 MongoDB。我还可以通过 mongo CLI 初始化副本集。

码头工人执行

docker exec mongodb bash -c 'mongo -u admin -p admin < /data/admin/replica.js'

replica.js

config = 
    "_id": "rs0",
    "members": [
        
            "_id": 0,
            "host": "mongodb:27017"
        
     ]


rs.initiate(config)

但是在我创建 Debezium 连接器之后,它在尝试读取副本集时会引发异常。

POST 以连接 REST API

curl -X POST -H "Content-Type: application/json" -d @debezium-mongodb.json http://localhost:8083/connectors

debezium-mongodb.json


    "name": "debezium-mongodb",
    "config": 
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
        "mongodb.name": "mongodb",
        "mongodb.hosts": "rs0/mongodb:27017",
        "mongodb.user": "admin",
        "mongodb.password": "admin"
    

异常

[2021-05-10 19:04:21,453] ERROR Error while attempting to sync 'rs0.config.system.sessions':  (io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource)
com.mongodb.MongoQueryException: Query failed with error code 13 and error message 'not authorized on config to execute command  find: "system.sessions", filter: , $db: "config", $clusterTime:  clusterTime: Timestamp(1620673459, 1), signature:  hash: BinData(0, 9B0A6379ED7BA2DF94F20FCA8AAE76877067E3D6), keyId: 6960734006342057988  , lsid:  id: UUID("e8418d9a-0f05-4c8c-9c34-a8a806183fe4") , $readPreference:  mode: "primaryPreferred"  ' on server mongodb:27017

【问题讨论】:

【参考方案1】:

这个问题出现是因为 mongodb 用户没有从配置数据库读取数据所需的权限。

从 mongodb oplog 中读取数据: “您还必须有一个具有适当角色的 MongoDB 用户来读取可以读取 oplog 的管理数据库。此外,该用户还必须能够读取分片集群的配置服务器中的配置数据库,并且必须具有 listDatabases特权操作。”

从官方 Debezium 文档中引用它

【讨论】:

以上是关于如何在 Docker 中将 Debezium 连接到 MongoDB?的主要内容,如果未能解决你的问题,请参考以下文章

在 Debezium Mysql 连接器中将更多表列入白名单的有效方法

如何使用Debezium从MS SQL中将250张表导入Kafka

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝

带有连接 table.include.list 的 Debezium docker 不起作用

MySQL 的 Debezium 连接器。缺少数据库历史主题