即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]
Posted
技术标签:
【中文标题】即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]【英文标题】:Unable to consume messages from kafka topic (using Python) even after making successful connections and getting messages in a kafka consumer console [duplicate] 【发布时间】:2019-09-17 18:58:22 【问题描述】:我有一个使用 Kafka 的 Debezium 设置。如文档中所述,我可以使用来自 kafka 控制台的消息。但是,当我在本地使用 Python 创建 kafka 消费者时,我无法使用消息。需要注意的是,kafka 控制台工作正常!
我尝试调查此问题,但无法找到类似的环境/情况
我要连接的python代码是:
from kafka import KafkaConsumer
consumer = KafkaConsumer('dbserver1.inventory.customers', group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
for message in consumer:
print(message)
无论现有消息或推送到此主题的新消息如何,这都会变为空白。
我确信消息存在,因为当我打开控制台使用者时,我可以看到消息。
只是为了清楚整个设置:
对于每个步骤(最后一步除外),我都遵循了这个 (https://github.com/debezium/debezium-examples/tree/master/tutorial#using-mongodb) 文档。
一切正常,但我的 Python 代码。
我还尝试使用kafka:9092
引导服务器创建消费者,但最终出现错误:
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
我的本地是 Mac OS。
仅供参考: 我可以获得其他所有内容,例如主题:
>>> consumer = KafkaConsumer('dbserver1.inventory.customers', group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')
>>> consumer.topics()
'my_connect_offsets', 'my_connect_configs', 'dbserver1.inventory.orders', 'connect-status', 'dbserver1.inventory.customers', 'dbserver1.inventory.products'
我正在通过命令启动消费者:
docker-compose -f debezium-mongodb.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
【问题讨论】:
您能否发布您的控制台消费者,您可以使用它成功地消费消息。 @GokulPotluri,完成! 【参考方案1】:如果没有看到您的撰写文件,localhost:9092
可能无法在基于您的 docker 命令的 Python 代码中运行
-
如果您的 Python 代码不在容器中运行,则需要从不同的端口读取。如果在容器中运行,则必须使用
kafka:9092
您使用的端口取决于容器的广告侦听器
Connect to Kafka running in Docker from local machine
【讨论】:
broker 是localhost:9092
时怎么会列出新的主题?至于docker文件:github.com/debezium/debezium-examples/tree/master/…
主题列表来自Zookeeper,不使用Consumer API协议。我没有使用 debezium/kafka
图像,所以我不知道他们的网络是如何配置的,但是您的 Compose 文件没有将广告主机设置为 docker 网络之外的任何东西 github.com/debezium/docker-images/tree/master/kafka/… 我敢打赌,如果你将您的 Python 代码放入容器中,然后使用 kafka:9092
作为连接字符串,它可能会开始工作。
否则,欢迎您阅读此博客以获取更多详细信息,而不是在这里再次解释rmoff.net/2018/08/02/kafka-listeners-explained
注意:您根本不需要 debezium/kafka
图像...您可以将其换成任何其他 Kafka 图像。
我阅读了rmoff.net/2018/08/02/kafka-listeners-explained 并配置了我的yaml
文件,如下所示(pastebin.com/SeDZ4aiG)我收到一个错误:pastebin.com/6w5CTwGs 如果我的配置是,你能看到 yaml 文件并纠正我吗错了吗?以上是关于即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]的主要内容,如果未能解决你的问题,请参考以下文章
即使在 worker.properties 中配置 plugin.path 后,Kafka 也找不到连接器插件
kafka通过控制台模拟消息发送和消息接收正常,但是通过javaAPI操作生产者发送消息不成功 消费者接收不到数据解决方案?