一对kafka消息生产者-消息消费者,python
Posted zhangphil
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一对kafka消息生产者-消息消费者,python相关的知识,希望对你有一定的参考价值。
一对kafka消息生产者-消息消费者,python
消息生产者:
import datetime
import random
from kafka import KafkaProducer
import json
from loguru import logger
import kafka_config
def on_send_success(record_metadata=None):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp=None):
logger.error('失败', exc_info=excp)
if __name__ == '__main__':
msg_producer = KafkaProducer(bootstrap_servers=f'kafka_config.server_host:kafka_config.server_port',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
val =
'event_type': 0,
'msg': random.randint(0, 100),
'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
future = msg_producer \\
.send(topic=r'some_topic', key=b'some_key', value=json.dumps(val)) \\
.add_callback(on_send_success) \\
.add_errback(on_send_error)
msg_producer.flush()
result = future.get(timeout=10)
print(result)
消息消费者:
import json
from kafka import KafkaConsumer
import kafka_config
if __name__ == '__main__':
consumer = KafkaConsumer(bootstrap_servers=[f'kafka_config.server_host:kafka_config.server_port'],
group_id='my_group')
conn = consumer.bootstrap_connected()
if conn:
print('连接OK')
topics = consumer.topics()
for t in topics:
print(t)
consumer.subscribe(topics=['some_topic'])
for msg in consumer:
print(msg.topic, json.loads(msg.value))
print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
kafka配置文件,kafka_config.py:
server_host = '127.0.0.1'
server_port = 9092
先启动消费者,然后生产者再发送消息。Kafka的broker配置参考:
以上是关于一对kafka消息生产者-消息消费者,python的主要内容,如果未能解决你的问题,请参考以下文章