kafka-python消息读写操作kafka,python,Windows
Posted zhangphil
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-python消息读写操作kafka,python,Windows相关的知识,希望对你有一定的参考价值。
kafka-python消息读写操作kafka,python,Windows环境
(1)安装python支持库
pip install kafka-python
(2)先把kafka起起来,
然后写好一个消息消费者,等待消息生产者生产消息:
import json
from kafka import KafkaConsumer
if __name__ == '__main__':
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=['fly_topic'])
conn = consumer.bootstrap_connected()
if conn:
print('连接OK')
ts = consumer.topics()
print(ts)
for msg in consumer:
print(msg)
print(msg.topic, json.loads(msg.value))
(3)消息生产者:
import datetime
import random
from kafka import KafkaProducer
import json
if __name__ == '__main__':
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
val =
'msg': random.randint(0, 100),
'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
producer.send(topic='fly_topic', key=b'fly_key', value=json.dumps(val))
producer.close()
消息生产者运行后,在(2)里面的消费者就会收到消息。
以上是关于kafka-python消息读写操作kafka,python,Windows的主要内容,如果未能解决你的问题,请参考以下文章
Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题
如何在kafka-python和confluent-kafka之间做出选择