即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

Posted

技术标签:

【中文标题】即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]【英文标题】:Unable to consume messages from kafka topic (using Python) even after making successful connections and getting messages in a kafka consumer console [duplicate] 【发布时间】:2019-09-17 18:58:22 【问题描述】:

我有一个使用 Kafka 的 Debezium 设置。如文档中所述,我可以使用来自 kafka 控制台的消息。但是,当我在本地使用 Python 创建 kafka 消费者时,我无法使用消息。需要注意的是,kafka 控制台工作正常!

我尝试调查此问题,但无法找到类似的环境/情况

我要连接的python代码是:

from kafka import KafkaConsumer
consumer = KafkaConsumer('dbserver1.inventory.customers', group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
for message in consumer:
    print(message)

无论现有消息或推送到此主题的新消息如何,这都会变为空白。

我确信消息存在,因为当我打开控制台使用者时,我可以看到消息。

只是为了清楚整个设置: 对于每个步骤(最后一步除外),我都遵循了这个 (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb) 文档。 一切正常,但我的 Python 代码。 我还尝试使用kafka:9092 引导服务器创建消费者,但最终出现错误:

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

我的本​​地是 Mac OS。

仅供参考: 我可以获得其他所有内容,例如主题:

>>> consumer = KafkaConsumer('dbserver1.inventory.customers', group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
>>> consumer.topics()
'my_connect_offsets', 'my_connect_configs', 'dbserver1.inventory.orders', 'connect-status', 'dbserver1.inventory.customers', 'dbserver1.inventory.products'

我正在通过命令启动消费者:

docker-compose -f debezium-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

【问题讨论】:

您能否发布您的控制台消费者,您可以使用它成功地消费消息。 @GokulPotluri,完成! 【参考方案1】:

如果没有看到您的撰写文件,localhost:9092 可能无法在基于您的 docker 命令的 Python 代码中运行

    如果您的 Python 代码不在容器中运行,则需要从不同的端口读取。如果在容器中运行,则必须使用kafka:9092 您使用的端口取决于容器的广告侦听器

Connect to Kafka running in Docker from local machine

【讨论】:

broker 是localhost:9092 时怎么会列出新的主题?至于docker文件:github.com/debezium/debezium-examples/tree/master/… 主题列表来自Zookeeper,不使用Consumer API协议。我没有使用 debezium/kafka 图像,所以我不知道他们的网络是如何配置的,但是您的 Compose 文件没有将广告主机设置为 docker 网络之外的任何东西 github.com/debezium/docker-images/tree/master/kafka/… 我敢打赌,如果你将您的 Python 代码放入容器中,然后使用 kafka:9092 作为连接字符串,它可能会开始工作。 否则,欢迎您阅读此博客以获取更多详细信息,而不是在这里再次解释rmoff.net/2018/08/02/kafka-listeners-explained 注意:您根本不需要 debezium/kafka 图像...您可以将其换成任何其他 Kafka 图像。 我阅读了rmoff.net/2018/08/02/kafka-listeners-explained 并配置了我的yaml 文件,如下所示(pastebin.com/SeDZ4aiG)我收到一个错误:pastebin.com/6w5CTwGs 如果我的配置是,你能看到 yaml 文件并纠正我吗错了吗?

以上是关于即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]的主要内容,如果未能解决你的问题,请参考以下文章

即使在 worker.properties 中配置 plugin.path 后,Kafka 也找不到连接器插件

kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?

为啥Kafka消费者连接zookeeper,生产者从broker获取元数据?

深入理解Kafka必知必会

kafka内置的zookeeper

Kafka高可用的保证