无法使用 python-kafka 消费消息

Posted

技术标签:

【中文标题】无法使用 python-kafka 消费消息【英文标题】:Unable to consume messages using python-kafka 【发布时间】:2019-09-23 22:42:25 【问题描述】:

我试图复制blog 中给出的步骤。在尝试时,给出了Kafka ConsumerKafka Producer python 代码,我可以在 python 交互式终端中运行代码,并且消费者控制台能够提供输出,但是如果我将它们传递到 python 文件(*.py) 中,它不消耗任何东西。

消费者

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
    print (message)

制片人

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

如何让它在 python 文件中工作?

【问题讨论】:

它会引发任何异常吗?还是默默地失败了? @knh190 消费者窗口(运行后)保持不动,生产者立即结束。 你能用this post建议的命令行验证消息是真的生成的吗? 或者this gist更有帮助。 @knh190 python 程序无法生成消息。运行 producer.py 后,主题中不会创建新消息 【参考方案1】:

我刚刚将 producer.flush() 添加到生产者代码中,它开始工作了。

因为 Kafka 客户端批量发送消息,而不是立即减少代理的负载。

您最初没有发送足够的数据以使其自行发生刷新,因此您的数据在您的应用程序结束时仅位于内存中。

参考batch.size生产者属性

【讨论】:

以上是关于无法使用 python-kafka 消费消息的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ消费者无法消费

即使在成功连接并在 kafka 消费者控制台中获取消息后,也无法使用来自 kafka 主题的消息(使用 Python)[重复]

activemq设置使用内存数和配置自动删除无法消费的消息以及过期的消息

IBM MQ 消费者应用程序无法使用文本消息(JMSCMQ1049: The character set '1208(UTF-8) Unmappable Action: REPORT)

Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题

rocketmq控制台跳过堆积是啥意思