kafka-python 消费者未收到消息
Posted
技术标签:
【中文标题】kafka-python 消费者未收到消息【英文标题】:kafka-python consumer not receiving messages 【发布时间】:2016-05-15 01:03:03 【问题描述】:我在使用 KafaConsumer
使其从头开始读取或从任何其他显式偏移量读取时遇到问题。
为同一主题的消费者运行命令行工具,我确实看到带有--from-beginning
选项的消息,否则它会挂起
$ ./kafka-console-consumer.sh --zookeeper localhost:port --topic topic_name --from-beginning
如果我通过 python 运行它,它会挂起,我怀疑这是由不正确的消费者配置引起的
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['localhost:9092'],
group_id=None,
auto_commit_enable=False,
auto_offset_reset='smallest')
print "Consuming messages from the given topic"
for message in consumer:
print "Message", message
if message is not None:
print message.offset, message.value
print "Quit"
输出:
使用来自给定主题的消息 (之后挂起)
我使用的是 kafka-python 0.9.5,代理运行的是 kafka 8.2。不确定确切的问题是什么。
按照 dpkp 的建议设置 _group_id=None_ 以模拟控制台使用者的行为。
【问题讨论】:
我最近下载了 kafka 包并尝试了你的代码,它对我有用。你能显示你的consumer.properties
内容文件吗?
***.com/questions/34684410/… 你可能需要设置起始偏移量...
也试过设置起始偏移量,也没有用。
我正在使用具有多个分区的主题对其进行测试,碰巧只有当生产者没有产生足够的消息以致所有分区中至少有一条消息时才会出现问题。 issues.apache.org/jira/browse/KAFKA-3159 如果所有分区都至少有一条消息,则消费者可以正常工作。
此外,KafkaConsumer 不会为不支持的编解码器抛出异常,因为我使用的是消费者尚不支持的 lz4,因此它没有解码消息也没有抛出异常。
【参考方案1】:
console-consumer 和您发布的python 消费者代码之间的区别在于,python 消费者使用消费者组来保存偏移量:group_id="test-consumer-group"
。相反,如果您设置 group_id=None,您应该会看到与控制台使用者相同的行为。
【讨论】:
是的,这是问题之一。实际问题是生产者使用 lz4 作为 python 消费者不支持的压缩类型,它在没有警告/错误的情况下退出。 在 kafka-python 1.0 中添加了 LZ4 支持;最新版本也不应该再因压缩错误而静默失败。 这种错误行为(对于单节点/分区卡夫卡)让我旋转了很长一段时间,除非我找到了这个答案【参考方案2】:auto_offset_reset='earliest' 帮我解决了。
【讨论】:
【参考方案3】:auto_offset_reset='earliest'
和 group_id=None
为我解决了这个问题。
【讨论】:
【参考方案4】:我的看法是:打印并确保偏移量符合您的预期。使用position()
和seek_to_beginning()
,请看代码中的cmets。
我无法解释:
-
为什么在实例化
KafkaConsumer
之后,没有分配分区,这是设计的吗? Hack around 是在seek_to_beginning()
之前调用poll()
一次
为什么有时在seek_to_beginning()
之后,第一次调用poll()
不会返回任何数据并且不会更改偏移量。
代码:
import kafka
print(kafka.__version__)
from kafka import KafkaProducer, KafkaConsumer
from time import sleep
KAFKA_URL = 'localhost:9092' # kafka broker
KAFKA_TOPIC = 'sida3_sdtest_topic' # topic name
# ASSUMING THAT the topic exist
# write to the topic
producer = KafkaProducer(bootstrap_servers=[KAFKA_URL])
for i in range(20):
producer.send(KAFKA_TOPIC, ('msg' + str(i)).encode() )
producer.flush()
# read from the topic
# auto_offset_reset='earliest', # auto_offset_reset is needed when offset is not found, it's NOT what we need here
consumer = KafkaConsumer(KAFKA_TOPIC,
bootstrap_servers=[KAFKA_URL],
max_poll_records=2,
group_id='sida3'
)
# (!?) wtf, why we need this to get partitions assigned
# AssertionError: No partitions are currently assigned if poll() is not called
consumer.poll()
consumer.seek_to_beginning()
# also AssertionError: No partitions are currently assigned if poll() is not called
print('partitions of the topic: ',consumer.partitions_for_topic(KAFKA_TOPIC))
from kafka import TopicPartition
print('before poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
# (!?) sometimes the first call to poll() returns nothing and doesnt change the offset
messages = consumer.poll()
sleep(1)
messages = consumer.poll()
print('after poll() x2: ')
print(consumer.position(TopicPartition(KAFKA_TOPIC, 0)))
print(consumer.position(TopicPartition(KAFKA_TOPIC, 1)))
print('messages: ', messages)
输出:
2.0.1
partitions of the topic: 0, 1
before poll() x2:
0
0
after poll() x2:
0
2
messages: TopicPartition(topic='sida3_sdtest_topic', partition=1): [ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=0, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg0', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1), ConsumerRecord(topic='sida3_sdtest_topic', partition=1, offset=1, timestamp=1600335075864, timestamp_type=0, key=None, value=b'msg1', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=4, serialized_header_size=-1)]
【讨论】:
【参考方案5】:我遇到了同样的问题:我可以在 kafka 控制台中接收,但无法使用包 kafka-python
使用 python 脚本获取消息。
最后我发现原因是我没有在我的producer.py
中调用producer.flush()
和producer.close()
,而documentation 中没有提到。
【讨论】:
【参考方案6】:我之前遇到过同样的问题,所以我在运行代码的机器上本地运行 kafka-topics 进行测试,我得到了 UnknownHostException。我在hosts
文件中添加了 IP 和主机名,它在 kafka-topics 和代码中都运行良好。
似乎KafkaConsumer
试图获取消息但没有引发任何异常而失败。
【讨论】:
以上是关于kafka-python 消费者未收到消息的主要内容,如果未能解决你的问题,请参考以下文章
kafka-python消费者读取数据时自定义偏移量,自定义数据读取的顺序