在 dockerized 环境中无法从 Flask 连接到 Kafka
Posted
技术标签:
【中文标题】在 dockerized 环境中无法从 Flask 连接到 Kafka【英文标题】:Cannot connect to Kafka from Flask in a dockerized environement 【发布时间】:2019-04-28 21:30:57 【问题描述】:我正在尝试构建一个以 Kafka 作为界面的 Flask 应用程序。我使用了 Python 连接器 kafka-python 和 Kafka 的 Docker 映像 spotify/kafkaproxy。
下面是 docker-compose 文件。
version: '3.3'
services:
kafka:
image: spotify/kafkaproxy
container_name: kafka_dev
ports:
- '9092:9092'
- '2181:2181'
environment:
- ADVERTISED_HOST=0.0.0.0
- ADVERTISED_PORT=9092
- CONSUMER_THREADS=1
- TOPICS=PROFILE_CREATED,IMG_RATED
- ZK_CONNECT=kafka7zookeeper:2181/root/path
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
depends_on:
- kafka
下面是我用来连接kafka的Python sn-p。在这里,我使用 Kafka 容器的别名 kafka
进行连接,因为 Docker 会负责将别名映射到它的 IP 地址。
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
我收到NoBrokersAvailable
错误。由此,我可以理解 Flask 应用找不到 Kafka 服务器。
Traceback (most recent call last):
File "./app.py", line 11, in <module>
consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
其他观察:
-
我能够从 Flask 容器运行
ping kafka
并从 Kafka 容器获取数据包。
当我在本地运行 Flask 应用程序时,尝试通过设置 BOOTSTRAP_SERVERS = ['localhost:9092']
连接到 Kafka 容器,它工作正常。
【问题讨论】:
对于一种可能的解决方案,请在此处查看我的答案 ***.com/a/50525419/3224238 如果不需要从docker-compose
外部连接,您可以在 kafka 服务定义中添加 hostname: kafka
并在 @987654333 中使用它@ 同样,您的flaskapp 应该配置为连接到kafka:9292
建议:编辑 Python 代码以从环境变量中提取引导服务器
【参考方案1】:
更新
正如 cricket_007 所提到的,鉴于您正在使用下面提供的 docker-compose,您应该使用 kafka:29092
从另一个容器连接到 Kafka。所以你的代码看起来像这样:
from kafka import KafkaConsumer, KafkaProducer
TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']
consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)
结束更新
我建议你使用来自Confluent Inc 的 Kafka 图像,它们有各种使用 docker-compose 的示例设置,可以随时使用,并且它们总是在更新它们。
试试这个:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- '9000:5000'
volumes:
- ./flask-app:/app
我使用了这个docker-compose.yml 并在顶部添加了您的服务 请注意:
此处使用的配置为 external 与代理的连接公开端口 9092,即来自 docker 网络的 外部 连接。这可能来自运行 docker 的主机,或者如果您有更复杂的设置,也可能来自更远的地方。如果后者为真,您需要将 KAFKA_ADVERTISED_LISTENERS 中的值 'localhost' 更改为可从这些远程客户端解析到 docker 主机的值
请务必查看其他示例,它们可能对您有用,尤其是在迁移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples
也值得检查:
看来您需要指定 api_version 以避免此错误。更多详情请查看here。
这个库的 1.3.5 版本(pypy 上的最新版本)仅列出了某些 API 版本 0.8.0 到 0.10.1。因此,除非您将 api_version 显式指定为 (0, 10, 1),否则客户端库尝试发现版本将导致 NoBrokersAvailable 错误。
producer = KafkaProducer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1)
)
这应该可以工作,有趣的是,设置 api_version 会意外地解决问题:
当您设置 api_version 时,客户端将不会尝试探查代理以获取版本信息。所以是探测操作失败了。版本探测连接和一般连接之间的一个很大区别是,前者只尝试在每个连接(每个代理)上连接一个接口,而后者——一般操作——将不断循环通过所有接口,直到连接成功。 #1411 通过切换版本探测逻辑以尝试在所有找到的接口上建立连接来解决此问题。
实际问题描述here
【讨论】:
感谢您的及时回复,@lloiacono。不幸的是,即使使用 confluentinc/cp-kafka,问题仍然存在。 @Shashank 你遇到同样的错误吗?您是否更改了上面发布的 Python sn-p?您能否进入烧瓶容器并执行以下命令:nc -vz kafka 9092
请注意,您可能需要安装 netcat。
不,我没有更改 Python sn-p。在 Flask 容器内运行命令 nc -vz kafka 1-10000
后,我得到以下输出。 kafka [192.168.80.2] 2181 open kafka [192.168.80.2] 9092 open
.
@Shashank 在 Flask 容器中,您需要将kafka:29092
与上述用于 Kafka 的 Compose 文件一起使用
当我为生产者和消费者尝试api_version=(0, 10, 1)
时,问题转移到KafkaTimeoutError: Failed to update metadata after 60.0 secs.
我确保设置BOOTSTRAP_SERVERS = ['kafka:29092']
并在不同的迭代中尝试PLAINTEXT_HOST://localhost:9092
。然后我认为问题可能是因为没有明确提及 TOPICS。我参考了docs 并在WHITELIST
中添加了主题,但无法使其正常工作。【参考方案2】:
我设法在所有服务之间使用名为 stream_net
的 network 来启动并运行它。
# for local development
version: "3.7"
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- stream_net
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- stream_net
flaskapp:
build: ./flask-app
container_name: flask_dev
ports:
- "9000:5000"
volumes:
- ./flask-app:/app
networks:
- stream_net
depends_on:
- kafka
networks:
stream_net:
localhost:9092
上的容器外部连接
kafka:29092
网络内的连接
当然,将所有已经在网络中运行的容器放在网络中是很奇怪的。但是通过这种方式,容器可以用它们的实际名称命名。也许有人可以准确地解释它是如何工作的,或者它可以帮助其他人理解问题的核心并正确解决它。
【讨论】:
以上是关于在 dockerized 环境中无法从 Flask 连接到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章
使用 EBS 部署多容器 docker 环境(flask 和 nginx)
Python Flask SQLAlchemy 容器无法连接到 MySQL 容器
无法在 Docker 中为 Flask 应用程序运行 pytest\requests 测试