不接收融合kafka简单生产者/消费者示例的消息?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了不接收融合kafka简单生产者/消费者示例的消息?相关的知识,希望对你有一定的参考价值。

我运行kafka_2.11-0.10.1.1confluent-kafka-0.9.2(主分支)python绑定使用librdkafka-0.9.2。我的机器运行ubuntu-16.04 x86_64。我在zookeeper-3.4.8-1港口运行2181。我像这样运行汇合的生产者示例:

$ cd confluent-kafka-python/examples
$ python producer.py localhost:9095 confluent-01
first message
2nd msg

和消费者:

$ python consumer.py localhost:9095 confluentgroup confluent-01

一切都在我的机器上本地运行,不运行任何防火墙。

备注:

  • 在Zookeeper上成功创建了该主题
  • 代理成功接收生产者消息:
  • 消费者设置以下conf {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'api.version.request': True }
  • 在开始producer/consumer工作正常一段时间,直到我得到生产者方面的Receive failed: Disconnected。 Exerpt:

$ python producer.py  localhost:9095 confluent-02
asd
% Message delivered to confluent-02 [0]
1234123
890890
% Message delivered to confluent-02 [0]
%3|1485791262.420|FAIL|rdkafka#producer-1| [thrd:obscura.ax.example.com:9095/3]: obscura.ax.example.com:9095/3: Receive failed: Disconnected

问题:一段时间后,我在消费者方面没有得到任何东西

问题:

  1. 我究竟做错了什么?
  2. 如何验证代理端收到的生产者消息?生成器消息在代理端正确接收。
  3. 如何调试消费者方面?我将'debug': "cgrp, topic, fetch"添加到消费者conf。我在哪里可以阅读日志?
答案

我有两个建议:

1)尝试将选项--from-beginning添加到consumer命令

2)代理的默认端口是9092,因此请检查要使用的端口

希望这可以帮助。

另一答案

我最终把事情搞定了。最初我跑了confluent-kafka tutorial,其中:

  • 不捕获ctrl+c SIGINT信号,
  • poll()没有超时

在消费者代码中。因此,我必须ctrl+z然后kill %1它在我的Linux机器上。我相信这个终止并没有关闭套接字,它保持开放一段时间(TIME_WAIT)。然后,当我重新启动消费者时,它从旧插槽中取出垃圾并卡住了。

我添加try: [...] except KeyboardInterrupt: consumer.close()来抓住ctrl+c并关闭套接字cleany。而且不再面临这个问题。

我希望这将有助于将来的某些人。

以上是关于不接收融合kafka简单生产者/消费者示例的消息?的主要内容,如果未能解决你的问题,请参考以下文章

kafka消息中间件及java示例

(04)使用kafka脚本发送消息和接收消息

记一次生产kafka消息消费的事故

分区数量超过消费者时的 Apache Kafka 消息消费

消息中间件之Kafka

Kafka介绍及安装部署