在 dockerized 环境中无法从 Flask 连接到 Kafka

Posted

技术标签:

【中文标题】在 dockerized 环境中无法从 Flask 连接到 Kafka【英文标题】:Cannot connect to Kafka from Flask in a dockerized environement 【发布时间】:2019-04-28 21:30:57 【问题描述】:

我正在尝试构建一个以 Kafka 作为界面的 Flask 应用程序。我使用了 Python 连接器 kafka-python 和 Kafka 的 Docker 映像 spotify/kafkaproxy。

下面是 docker-compose 文件。

version: '3.3'
services:
  kafka:
    image: spotify/kafkaproxy
    container_name: kafka_dev
    ports:
      - '9092:9092'
      - '2181:2181'
    environment:
      - ADVERTISED_HOST=0.0.0.0
      - ADVERTISED_PORT=9092
      - CONSUMER_THREADS=1
      - TOPICS=PROFILE_CREATED,IMG_RATED
      - ZK_CONNECT=kafka7zookeeper:2181/root/path
  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - '9000:5000'
    volumes:
      - ./flask-app:/app
    depends_on:
      - kafka

下面是我用来连接kafka的Python sn-p。在这里,我使用 Kafka 容器的别名 kafka 进行连接,因为 Docker 会负责将别名映射到它的 IP 地址。

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:9092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

我收到NoBrokersAvailable 错误。由此,我可以理解 Flask 应用找不到 Kafka 服务器。

Traceback (most recent call last):
  File "./app.py", line 11, in <module>
    consumer = KafkaConsumer("PROFILE_CREATED", bootstrap_servers=BOOTSTRAP_SERVERS)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 340, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 219, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 819, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

其他观察:

    我能够从 Flask 容器运行 ping kafka 并从 Kafka 容器获取数据包。 当我在本地运行 Flask 应用程序时,尝试通过设置 BOOTSTRAP_SERVERS = ['localhost:9092'] 连接到 Kafka 容器,它工作正常。

【问题讨论】:

对于一种可能的解决方案,请在此处查看我的答案 ***.com/a/50525419/3224238 如果不需要从 docker-compose 外部连接,您可以在 kafka 服务定义中添加 hostname: kafka 并在 @987654333 中使用它@ 同样,您的flaskapp 应该配置为连接到kafka:9292 建议:编辑 Python 代码以从环境变量中提取引导服务器 【参考方案1】:

更新

正如 cricket_007 所提到的,鉴于您正在使用下面提供的 docker-compose,您应该使用 kafka:29092 从另一个容器连接到 Kafka。所以你的代码看起来像这样:

from kafka import KafkaConsumer, KafkaProducer

TOPICS = ['PROFILE_CREATED', 'IMG_RATED']
BOOTSTRAP_SERVERS = ['kafka:29092']

consumer = KafkaConsumer(TOPICS, bootstrap_servers=BOOTSTRAP_SERVERS)

结束更新

我建议你使用来自Confluent Inc 的 Kafka 图像,它们有各种使用 docker-compose 的示例设置,可以随时使用,并且它们总是在更新它们。

试试这个:

---
version: '2'
services:
zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000

kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
    - zookeeper
    ports:
    - 9092:9092
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
    - '9000:5000'
    volumes:
    - ./flask-app:/app

我使用了这个docker-compose.yml 并在顶部添加了您的服务 请注意:

此处使用的配置为 external 与代理的连接公开端口 9092,即来自 docker 网络的 外部 连接。这可能来自运行 docker 的主机,或者如果您有更复杂的设置,也可能来自更远的地方。如果后者为真,您需要将 KAFKA_ADVERTISED_LISTENERS 中的值 'localhost' 更改为可从这些远程客户端解析到 docker 主机的值

请务必查看其他示例,它们可能对您有用,尤其是在迁移到生产环境时:https://github.com/confluentinc/cp-docker-images/tree/5.0.1-post/examples

也值得检查:

看来您需要指定 api_version 以避免此错误。更多详情请查看here。

这个库的 1.3.5 版本(pypy 上的最新版本)仅列出了某些 API 版本 0.8.0 到 0.10.1。因此,除非您将 api_version 显式指定为 (0, 10, 1),否则客户端库尝试发现版本将导致 NoBrokersAvailable 错误。

producer = KafkaProducer(
    bootstrap_servers=URL,
    client_id=CLIENT_ID,
    value_serializer=JsonSerializer.serialize,
    api_version=(0, 10, 1)
)

这应该可以工作,有趣的是,设置 api_version 会意外地解决问题:

当您设置 api_version 时,客户端将不会尝试探查代理以获取版本信息。所以是探测操作失败了。版本探测连接和一般连接之间的一个很大区别是,前者只尝试在每个连接(每个代理)上连接一个接口,而后者——一般操作——将不断循环通过所有接口,直到连接成功。 #1411 通过切换版本探测逻辑以尝试在所有找到的接口上建立连接来解决此问题。

实际问题描述here

【讨论】:

感谢您的及时回复,@lloiacono。不幸的是,即使使用 confluentinc/cp-kafka,问题仍然存在。 @Shashank 你遇到同样的错误吗?您是否更改了上面发布的 Python sn-p?您能否进入烧瓶容器并执行以下命令:nc -vz kafka 9092 请注意,您可能需要安装 netcat。 不,我没有更改 Python sn-p。在 Flask 容器内运行命令 nc -vz kafka 1-10000 后,我得到以下输出。 kafka [192.168.80.2] 2181 open kafka [192.168.80.2] 9092 open. @Shashank 在 Flask 容器中,您需要将kafka:29092 与上述用于 Kafka 的 Compose 文件一起使用 当我为生产者和消费者尝试api_version=(0, 10, 1) 时,问题转移到KafkaTimeoutError: Failed to update metadata after 60.0 secs. 我确保设置BOOTSTRAP_SERVERS = ['kafka:29092'] 并在不同的迭代中尝试PLAINTEXT_HOST://localhost:9092。然后我认为问题可能是因为没有明确提及 TOPICS。我参考了docs 并在WHITELIST 中添加了主题,但无法使其正常工作。【参考方案2】:

我设法在所有服务之间使用名为 stream_net 的 network 来启动并运行它。

# for local development
version: "3.7"
services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - stream_net

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - stream_net

  flaskapp:
    build: ./flask-app
    container_name: flask_dev
    ports:
      - "9000:5000"
    volumes:
      - ./flask-app:/app
    networks:
      - stream_net
    depends_on:
      - kafka

networks:
  stream_net:
localhost:9092 上的容器外部连接 kafka:29092 网络内的连接

当然,将所有已经在网络中运行的容器放在网络中是很奇怪的。但是通过这种方式,容器可以用它们的实际名称命名。也许有人可以准确地解释它是如何工作的,或者它可以帮助其他人理解问题的核心并正确解决它。

【讨论】:

以上是关于在 dockerized 环境中无法从 Flask 连接到 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

使用 EBS 部署多容器 docker 环境(flask 和 nginx)

docker 部署 flask配置环境及测试

Python Flask SQLAlchemy 容器无法连接到 MySQL 容器

无法在 Docker 中为 Flask 应用程序运行 pytest\requests 测试

手把手教你用 Flask,Docker 和 Kubernetes 部署Python机器学习模型(附代码)

uwsgi flask 在python3环境下配置