有没有办法为这个 debezium 连接器设置一个接收器和源连接器?

Posted

技术标签:

【中文标题】有没有办法为这个 debezium 连接器设置一个接收器和源连接器?【英文标题】:Is there a way to setup a sink and source connector for this debezium connector? 【发布时间】:2021-04-19 13:57:31 【问题描述】:

我正在使用这里找到的 debezium-connector:https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.4.0.Final/debezium-connector-oracle-1.4.0.Final-plugin.tar.gz

我正在按照 docker-compose 的这些说明进行操作:https://github.com/confluentinc/demo-scene/blob/master/oracle-and-kafka/docker-compose.yml

我使用 confluent-hub 为 jdbc-connector 完成了此操作,但我不知道如何为 debezium 执行此操作。添加到 /usr/share/java 并运行是解决不了的

所以我的 docker-compose 是:

    ---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.1
    hostname: zookeeper
    container_name: zookeeper
    volumes:
      - /dados/persistence/zookeeper/data:/var/lib/zookeeper/data
      - /dados/persistence/zookeeper/log:/var/lib/zookeeper/log
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-server:6.0.1
    hostname: broker
    container_name: broker
    volumes:
      - /dados/persistence/broker/data:/var/lib/kafka/data
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

  kafka-connect:
    image: cnfldemos/cp-server-connect-datagen:0.4.0-6.0.1
    hostname: connect
    container_name: kafka-connect
    volumes:
      - /dados/packages/confluent-hub/share/confluent-hub-components:/usr/share/confluent-hub-components/custom
      - /dados/persistence/kafka-connect/jars:/etc/kafka-connect/jars
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %Xconnector.context%m (%c:%L)%n"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/confluent-hub-components/custom"
      LD_LIBRARY_PATH: '/usr/share/java/debezium-connector-oracle/instantclient_19_6/'

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.0.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - kafka
      - schema-registry
      - kafka-connect
      - ksqldb
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
      CONTROL_CENTER_CONNECT_CLUSTER: 'kafka-connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://10.58.0.207:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://10.58.0.207:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://10.58.0.207:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb:
    image: confluentinc/cp-ksqldb-server:6.0.1
    hostname: ksqldb
    container_name: ksqldb-server
    depends_on:
      - kafka
      - kafka-connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_BOOTSTRAP_SERVERS: kafka:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.0.1
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - kafka-connect
      - ksqldb
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:6.0.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb
      - kafka
      - schema-registry
      - kafka-connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: kafka:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.0.1
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

【问题讨论】:

你有一个安装路径/etc/kafka-connect/jars,但没有在CONNECT_PLUGIN_PATH中使用它 【参考方案1】:

你需要将/etc/kafka-connect/jars添加到CONNECT_PLUGIN_PATH

【讨论】:

唯一的 “罐子” 我发现是:在/ var / lib中/搬运工/ overlay2 / ad6e90f2efe793f95c252a584d4ef8188e477187b66e095d1cc7a92c3a7d78ef / DIFF的/ etc / /罐的/ var / lib中/搬运工/ overlay2 / ab6f295bc1bbc45caf41ffc7eefcfd43c7cf975ad51fb130e5f1f7298b9d3485 /合并卡夫卡连接/ etc/kafka-connect/jars /dados/persistence/kafka-connect/jars 第一个和第二个是我运行“docker-compose up -d”时下载的图像。最后一个来自我创建的持久性目录,用于重定向来自 broker、kafka-connect 和 zookeeper 的数据 你把你的 Debezium 罐子放在哪里了?是在您主机上的本地文件夹中吗? 我已经把它放在“/usr/share/java”中了。它位于“debezium-connector-oracle”文件夹中。但我已经在我的 docker-compose 文件中添加了“usr/share/java” 我不太关注(并且 cmets 线程并不适合此)。你能编辑你的问题以清楚地说明你有 Debezium JAR 的本地文件夹吗?我假设您已将该文件夹安装到您的 Docker 容器中?

以上是关于有没有办法为这个 debezium 连接器设置一个接收器和源连接器?的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法删除debezium mysql连接器的现有任务并用新任务替换它

无法为 SqlServer 连接器设置 debezium 嵌入式引擎

Debezium Kafka 连接器 mongodb

Debezium 时间戳问题,无法转换为本地时区

需要 Debezium 连接器中用于 postgres 插入事件的主键信息

是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?