使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用

Posted

技术标签:

【中文标题】使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用【英文标题】:Create Kafka-Connect cluster with Docker Compose to be used by ksqlDB 【发布时间】:2021-08-03 13:52:11 【问题描述】:

我基本上尝试做的是使用 Docker Compose 拥有多个 Kafka Connect 实例。我希望 ksqlDB 使用这个集群。目前,它们都在一台机器上运行,但最终我想将它部署到多节点环境。我的问题是 ksqlDB 显然找不到 Kafka Connect 集群。 KSQL_KSQL_CONNECT_URL 代表单个 Kafka Connect 实例的 URL。不提供此变量会导致默认值,即 localhost:8083。

我找到了this docker-compose file,我认为它可以做我想做的事情:ksqlDB 和多个 Kafka Connect 实例。不幸的是,它对我没有太大帮助,因为它使用的是旧版本的 KSQL Server。这是我的 docker-compose 文件:

---
version: '3'

services:

  ksqldb-server-connect-test:
    image: confluentinc/ksqldb-server:0.15.0
    hostname: ksqldb-server-connect-test
    container_name: ksqldb-server-connect-test
    #ports:
    #  - "8088:8088"
    network_mode: "host"
    environment:
      KSQL_KSQL_SERVICE_ID: "default_"
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: localhost:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://localhost:8081
      #KSQL_KSQL_CONNECT_URL: http://localhost:8083
      
  ksqldb-cli-connect-test:
    image: confluentinc/ksqldb-cli:0.15.0
    container_name: ksqldb-cli-connect-test
    network_mode: "host"
    depends_on:
      - ksqldb-server-connect-test
    entrypoint: /bin/sh
    tty: true

  schema-registry-connect-test:
    image: confluentinc/cp-schema-registry:6.0.1
    container_name: schema-registry-connect-test
    network_mode: "host"
    #ports:
    #  - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: localhost:9092
    restart: always
    
  kafka-connect-1:
    image: confluentinc/cp-kafka-connect-base:6.0.1
    container_name: kafka-connect-1
    network_mode: "host"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "localhost:9092"
      CONNECT_REST_PORT: 8082
      CONNECT_GROUP_ID: kafka-connect-test
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs-test
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets-test
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status-test
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://localhost:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %Xconnector.context%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_PARTITIONS: "25"
      CONNECT_STATUS_STORAGE_PARTITIONS: "5"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    volumes:
      - $PWD/data/connect-jars/:/usr/share/java/kafka-connect-jdbc/jars/
      - $PWD/jmx:/usr/app/
      
  kafka-connect-2:
    image: confluentinc/cp-kafka-connect-base:6.0.1
    container_name: kafka-connect-2
    network_mode: "host"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "localhost:9092"
      CONNECT_REST_PORT: 8084
      CONNECT_GROUP_ID: kafka-connect-test
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs-test
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets-test
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status-test
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://localhost:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %Xconnector.context%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_PARTITIONS: "25"
      CONNECT_STATUS_STORAGE_PARTITIONS: "5"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    volumes:
      - $PWD/data/connect-jars/:/usr/share/java/kafka-connect-jdbc/jars/
      - $PWD/jmx:/usr/app/

请注意,我使用 network_mode: "host" 是因为 Kafka 集群本身不在 Docker 容器中运行,因此在我的情况下,这可以简化与 Kafka 的通信。

是否有人对如何仅使用 docker-compose 将 ksqlDB 连接到 Kafka Connect 集群有想法或解决方案?

【问题讨论】:

您能否澄清一下您是否希望在一个 Connect 集群中有两个连接工作程序?还是两个单独的 Kafka Connect 工作人员? @RobinMoffatt 我需要实现的是容错。我想有e。 G。两台或三台机器,每台都有一个 Connect 实例。现在(测试目的),在一台机器上拥有两个 Connect 实例就足够了。我面临的主要问题是如何将 ksqlDB 与 > 1 个 Kafka Connect 实例连接起来。要求基本上是负载平衡,如果一个实例由于某种原因死亡,另一个实例可以接管额外的负载,直到启动一个新实例。 上次我检查过,使用localhost:9092 / localhost:8081 使容器连接到自己,而不是“主机网络”(或注册表等其他容器)上的服务,我猜那是问题的一部分。忘记 ksql,因为您没有显示任何日志,Connect 容器(或模式注册表)是否真的启动了? @OneCricketeer 我在 docker-compose 文件中设置了 network_mode ="host",这使得容器连接到主机的本地主机。 假设您的主机是 Linux,也许。仍然没有回答我的问题 - 其他容器是否启动正常并实际与代理连接?除此之外,使用host.docker.internal:9092没有主机网络模式对于我们这些不运行Linux的人来说更容易重现 【参考方案1】:

我需要实现的是容错。

好的,所以您需要在单个 Kafka Connect 组中 >1 个 Kafka Connect 工作人员。这就是您配置相同存储主题和group.id 时所获得的结果 ?

所以问题是如何让 ksqlDB 连接到 Kafka Connect 工作人员集群。由于 Kafka Connect 使用 Kafka 本身来保存配置,所以它连接到哪个工作人员并不重要。 ksql.connect.url(因此 docker 中的 KSQL_KSQL_CONNECT_URL 环境变量)是执行此操作的正确方法,但从文档中不清楚您是否可以指定多个值。

如果你不能,那么我猜你需要在工作人员面前放置一个无状态负载均衡器并将 ksqlDB 指向那里。

此外,主机名将是容器的名称 (kafka-connect-1 / kafka-connect-2),而不是 localhost

【讨论】:

很遗憾,您不能在 ksl.connect.url 中指定多个 URL。如果我这样做,请连接到 ksql-cli 并运行“显示连接器;”,它会显示“io.confluent.ksql.util.KsqlServerException:org.apache.hc.client5.http.ClientProtocolException:未指定目标主机”。我已经尝试过了,我应该首先告诉。我将在接下来的几天研究负载均衡器的方法。谢谢 是的,我与一位 ksqlDB 工程师确认了 - 负载均衡器是要走的路。随时通过github.com/confluentinc/ksql 提出增强请求 如果有人对此感兴趣或遇到同样的问题:我们很可能会采用在每个 ksqlDB 实例中以嵌入式模式安装 Kafka Connect 的方法,因为一个可取的要求是减少要管理的应用程序数量。

以上是关于使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用的主要内容,如果未能解决你的问题,请参考以下文章

docker-compose 网络问题

docker-compose创建的网段与内网冲突时的解决方法

Docker Compose 创建yml 简单试例

docker学习 docker-compose使用示例

docker-compose up与docker-compose up -d

如何通过 ssh 进入使用 docker-compose 创建的服务