Sprint 启动 kafka Consumer 无法连接到 kafka 容器
Posted
技术标签:
【中文标题】Sprint 启动 kafka Consumer 无法连接到 kafka 容器【英文标题】:Sprint boot kafka Consumer can not connect to the kafka container 【发布时间】:2020-03-12 20:06:15 【问题描述】:我正在尝试部署 2 个 Spring Boot 应用程序(kafka 生产者和消费者)。当我将 Producer 部署到 docker 时一切正常,但是当我部署时,我的 Consumer 不起作用,因为没有与 kafka 容器的连接。
日志显示这个错误
2019-11-17 05:32:22.644 WARN 1 --- [main] o.a.k.c.NetworkClient: [Consumer clientId=consumer-1, groupId=exampleGroup] Connection to node -1 could not be established. Broker may not be available.
我的 docker-compose.yml 是
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "topic1:1:1"
在我的 KafkaConfig 课程中:
@EnableKafka
@Configuration
public class KafkaConfig
@Bean
public ConsumerFactory<String, String> consumerFactory()
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.KAFKA_BROKERS);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "exampleGroup");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KafkaConstants.ENABLE_AUTO_COMMIT_CONFIG);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConstants.OFFSET_RESET_EARLIER);
// config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, KafkaConstants.SESSION_TIMEOUT_MS);
return new DefaultKafkaConsumerFactory<>(config);
还有常量类
public class KafkaConstants
public static String KAFKA_BROKERS = "localhost:9092";
public static Integer MESSAGE_COUNT=1000;
public static String TOPIC_NAME="demo";
public static String GROUP_ID_CONFIG="exampleGroup";
public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;
public static String OFFSET_RESET_LATEST="latest";
public static String OFFSET_RESET_EARLIER="earliest";
public static Integer MAX_POLL_RECORDS=1;
public static Integer SESSION_TIMEOUT_MS = 180000;
public static Integer REQUEST_TIMEOUT_MS_CONFIG = 181000;
public static String ENABLE_AUTO_COMMIT_CONFIG = "false";
public static Integer AUTO_COMMIT_INTERVAL_MS_CONFIG = 8000;
当我在我的计算机上安装 zookepper 和 kafka 并使用 intellij 运行这 2 个 spring boot 应用程序时工作正常。问题是当我部署到本地 docker 时。
你能帮帮我吗?
更新
更新我的 docker-compose:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: always
ports:
- 9092:9092
depends_on:
- zookeeper
links:
- zookeeper:zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "ACC_GROUP_CREATE:1:1"
consumer:
image: micro1
container_name: micro1
depends_on:
- kafka
restart: always
ports:
- 8088:8088
depends_on:
- kafka
links:
- kafka:kafka
producer:
image: micro2
container_name: micro2
depends_on:
- kafka
restart: always
ports:
- 8087:8087
depends_on:
- kafka
links:
- kafka:kafka
工作正常!基于@hqt 的响应,但我不知道为什么我需要添加这些消费者/生产者行
【问题讨论】:
尝试通过端口提供代理 IP 地址 什么意思?在 KafkaConfig 类上,我已经指定了主机和端口 如果您将代码“部署到 docker”,则不能使用 localhost 作为 Kafka 连接,因为那是容器本身,而不是 kafka @cricket_007 我的 sprint 启动应用程序或 docker 容器的配置应该是什么,因为我也尝试在 BOOTSTRAP_SERVERS_CONFIG 上将主机/端口设置为 0.0.0.0:9092 并且有相同的连接问题跨度> 好吧,0.0.0.0 不是有效的 IP 地址。 kafka 容器名为kafka
,因此您的应用应连接到 kafka:9092
【参考方案1】:
KAFKA_ADVERTISED_HOST_NAME
属性导致的问题。这是documentation,它解释了为什么 Kafka 需要广告地址。
关键是当你运行一个客户端时,你传递给它的代理 只是它要去的地方并获取有关经纪人的元数据 集群来自。 它将连接到的实际主机和 IP 读取/写入数据基于代理传回的数据 在那个初始连接中——即使它只是一个节点并且 返回的代理与连接的代理相同。
当您将KAFKA_ADVERTISED_HOST_NAME
设置为本地主机时:
在容器环境中运行 Web 应用程序时,将 KAFKA_ADVERTISED_HOST_NAME
属性更新为 kafka
将起作用。请注意,您的 Web 应用和 kafka 容器都必须在 docker 的网络上。
这是使用 Wurstmeister 的镜像运行 Kafka 集群的建议 docker-compose。
version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "topic1:1:1"
web_app:
# your definition of the web_app goes here
然后就可以连接到容器环境内地址kafka:9092
上的Kafka broker了。
【讨论】:
localhost 和 127.0.0.1 是一回事,那为什么会这样呢?公布的主机名应该是 kafka @cricket_007 我的错。在输入 docker-compose 部分的答案时。我在想他无法连接到主机环境上的kafka,这是不正确的。感谢您指出这个错误。 @hqt 将KAFKA_ADVERTISED_HOST_NAME
更改为 kafka 但在我的 docker-compose 上添加消费者和生产者的个人配置工作正常!请查看我的更新部分
之所以需要在同一个docker-compose上添加消费者镜像和生产者镜像,以使这些容器在运行时共享同一个网络。否则,他们无法解析kafka:9092
地址。有一些更好和推荐的方法可以让你想要的所有容器共享同一个网络,但这是最容易测试的方法。
@hqt 我在哪里可以看到使所有容器共享同一个网络的最佳实践或建议?【参考方案2】:
这是一个常见的问题,你需要阅读和理解的权威文档是https://www.confluent.io/blog/kafka-listeners-explained
我正在复制它的 tl;dr: 以供参考:
您需要将 Advertisementd.listeners(或 >KAFKA_ADVERTISED_LISTENERS 如果您使用 Docker >images)设置为外部地址(主机/IP),以便客户端 > 可以正确连接到它。否则,他们会尝试连接 > 内部主机地址,如果无法访问,那么就会出现问题。”
【讨论】:
以上是关于Sprint 启动 kafka Consumer 无法连接到 kafka 容器的主要内容,如果未能解决你的问题,请参考以下文章
kafka启动consumer报java.nio.channels.UnresolvedAddressException
日常填坑1-linux 下kafka启动失败,没有默认节点__consumer_offsets
Kafka 学习笔记之 Kafka0.11之console-producer/console-consumer
Win10 1803 Sprint Creators update Consumer edition的版本记录