python操作kafka
Posted testerwangzhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python操作kafka相关的知识,希望对你有一定的参考价值。
mac启动zookeeper和kafka:zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
#py2.7版本使用kafka,py3.7版本使用kafka-python
import datetime
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=‘localhost:9092‘)
# send函数传递topic类型为str,value的类型是bytes
future = producer.send(‘hello‘, json.dumps(
{"method": "get", "step": "1", "type": "test", "testName": "kafka",
"cid": "{0}".format(datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘)),
"info": "demo{}".format(1)}).encode())
record_metadata = future.get(timeout=10)
print( record_metadata, datetime.datetime.now().strftime(‘%Y%m%d%H%M%S‘))
from kafka import KafkaConsumer
consumer = KafkaConsumer(‘hello‘, bootstrap_servers=[‘localhost:9092‘])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print( recv)
以上是关于python操作kafka的主要内容,如果未能解决你的问题,请参考以下文章
如何在kafka-python和confluent-kafka之间做出选择
Python数据库操作 Python操作mysql#学习猿地