05_Kafka Python_Consumer模拟
Posted shayzhang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了05_Kafka Python_Consumer模拟相关的知识,希望对你有一定的参考价值。
Python客户端: Kafka-python
安装: pip install kafka-python
Consumer端模拟代码
""" Kafka Consumer Test Client: Kafka-Python """ # Consumer from kafka import KafkaConsumer def main(): # A message iterator generated, start reading message from the beginning of the topic consumer = KafkaConsumer("ctopic", group_id="cg-1", bootstrap_servers=["192.168.229.100:9092", "192.168.229.101:9092"], auto_offset_reset=\'earliest\') for msg in consumer: topic = msg.topic partiton = msg.partition # can explained as msg id, starting from 0 offset = msg.offset key = str(msg.key, encoding=\'utf-8\') value = str(msg.value, encoding=\'utf-8\') print("Topic: "+ topic + "From Partion: " + str(partiton)) print("Offset: " + str(offset)) print("Msg key: " + key + " Msg value: " + value) print("*"*20) consumer.close() if __name__ == "__main__": main()
启动该Consumer,并多次运行Producer, 由于消息的key相同,会被存入同一个partition, 从Consumer端解析出的partition也可以确认
以上是关于05_Kafka Python_Consumer模拟的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 如何从 __consumer_offsets 主题中读取