Sprint 启动 kafka Consumer 无法连接到 kafka 容器

Posted

技术标签:

【中文标题】Sprint 启动 kafka Consumer 无法连接到 kafka 容器【英文标题】:Sprint boot kafka Consumer can not connect to the kafka container 【发布时间】:2020-03-12 20:06:15 【问题描述】:

我正在尝试部署 2 个 Spring Boot 应用程序(kafka 生产者和消费者)。当我将 Producer 部署到 docker 时一切正常,但是当我部署时,我的 Consumer 不起作用,因为没有与 kafka 容器的连接。

日志显示这个错误

2019-11-17 05:32:22.644  WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.

我的 docker-compose.yml 是

version: '3'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "topic1:1:1"

在我的 KafkaConfig 课程中:

@EnableKafka
@Configuration
public class KafkaConfig 

    @Bean
    public ConsumerFactory<String, String> consumerFactory()
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      //  config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
       // config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);

        return new DefaultKafkaConsumerFactory<>(config);
    

还有常量类

public class KafkaConstants 

    public static String KAFKA_BROKERS = "localhost:9092";
    public static Integer MESSAGE_COUNT=1000;
    public static String TOPIC_NAME="demo";
    public static String GROUP_ID_CONFIG="exampleGroup";
    public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
    public static String OFFSET_RESET_LATEST="latest";
    public static String OFFSET_RESET_EARLIER="earliest";
    public static Integer MAX_POLL_RECORDS=1;
    public static Integer SESSION_TIMEOUT_MS = 180000;
    public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
    public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
    public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;

当我在我的计算机上安装 zookepper 和 kafka 并使用 intellij 运行这 2 个 spring boot 应用程序时工作正常。问题是当我部署到本地 docker 时。

你能帮帮我吗?

更新

更新我的 docker-compose:

version: '3'

services:

  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    links:
      - zookeeper:zookeeper
    environment:
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"

  consumer:
    image: micro1
    container_name: micro1
    depends_on:
      - kafka
    restart: always
    ports:
      - 8088:8088
    depends_on:
      - kafka
    links:
      - kafka:kafka

  producer:
    image: micro2
    container_name: micro2
    depends_on:
      - kafka
    restart: always
    ports:
      - 8087:8087
    depends_on:
      - kafka
    links:
      - kafka:kafka

工作正常!基于@hqt 的响应,但我不知道为什么我需要添加这些消费者/生产者行

【问题讨论】:

尝试通过端口提供代理 IP 地址 什么意思?在 KafkaConfig 类上,我已经指定了主机和端口 如果您将代码“部署到 docker”,则不能使用 localhost 作为 Kafka 连接,因为那是容器本身,而不是 kafka @cricket_007 我的 sprint 启动应用程序或 docker 容器的配置应该是什么,因为我也尝试在 BOOTSTRAP_SERVERS_CONFIG 上将主机/端口设置为 0.0.0.0:9092 并且有相同的连接问题跨度> 好吧,0.0.0.0 不是有效的 IP 地址。 kafka 容器名为 kafka ,因此您的应用应连接到 kafka:9092 【参考方案1】:

KAFKA_ADVERTISED_HOST_NAME 属性导致的问题。这是documentation,它解释了为什么 Kafka 需要广告地址。

关键是当你运行一个客户端时,你传递给它的代理 只是它要去的地方并获取有关经纪人的元数据 集群来自。 它将连接到的实际主机和 IP 读取/写入数据基于代理传回的数据 在那个初始连接中——即使它只是一个节点并且 返回的代理与连接的代理相同。

当您将KAFKA_ADVERTISED_HOST_NAME 设置为本地主机时:

您的应用程序在“Intellij”上运行,这意味着在主机环境中运行。该主机创建 Kafka 的容器,因此来自 localhost:9092 的访问将指向 Kafka 的容器。 当您的应用程序在容器内运行时,localhost:9092 表示容器本身。所以毫无意义。 (这个容器甚至没有任何进程监听 9092 端口)

在容器环境中运行 Web 应用程序时,将 KAFKA_ADVERTISED_HOST_NAME 属性更新为 kafka 将起作用。请注意,您的 Web 应用和 kafka 容器都必须在 docker 的网络上。

这是使用 Wurstmeister 的镜像运行 Kafka 集群的建议 docker-compose。

version: "2"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: "topic1:1:1"

   web_app:
    # your definition of the web_app goes  here

然后就可以连接到容器环境内地址kafka:9092上的Kafka broker了。

【讨论】:

localhost 和 127.0.0.1 是一回事,那为什么会这样呢?公布的主机名应该是 kafka @cricket_007 我的错。在输入 docker-compose 部分的答案时。我在想他无法连接到主机环境上的kafka,这是不正确的。感谢您指出这个错误。 @hqt 将 KAFKA_ADVERTISED_HOST_NAME 更改为 kafka 但在我的 docker-compose 上添加消费者和生产者的个人配置工作正常!请查看我的更新部分 之所以需要在同一个docker-compose上添加消费者镜像和生产者镜像,以使这些容器在运行时共享同一个网络。否则,他们无法解析kafka:9092 地址。有一些更好和推荐的方法可以让你想要的所有容器共享同一个网络,但这是最容易测试的方法。 @hqt 我在哪里可以看到使所有容器共享同一个网络的最佳实践或建议?【参考方案2】:

这是一个常见的问题,你需要阅读和理解的权威文档是https://www.confluent.io/blog/kafka-listeners-explained

我正在复制它的 tl;dr: 以供参考:

您需要将 Advertisementd.listeners(或 >KAFKA_ADVERTISED_LISTENERS 如果您使用 Docker >images)设置为外部地址(主机/IP),以便客户端 > 可以正确连接到它。否则,他们会尝试连接 > 内部主机地址,如果无法访问,那么就会出现问题。”

【讨论】:

以上是关于Sprint 启动 kafka Consumer 无法连接到 kafka 容器的主要内容,如果未能解决你的问题,请参考以下文章

kafka启动consumer报java.nio.channels.UnresolvedAddressException

日常填坑1-linux 下kafka启动失败,没有默认节点__consumer_offsets

Kafka 学习笔记之 Kafka0.11之console-producer/console-consumer

Win10 1803 Sprint Creators update Consumer edition的版本记录

Kafka分区分配策略(Partition Assignment Strategy

5.kafka API consumer