kafka-3python生产者和消费者实用demo
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-3python生产者和消费者实用demo相关的知识,希望对你有一定的参考价值。
程序分为productor.py是发送消息端,consumer为消费消息端,
启动的时候先启动product再启动consumer,毕竟只有发了消息,消费端才有消息可以消费,
productor.py
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = ‘192.168.1.200‘ # kafka服务器地址 kafka_port = 9092 # kafka服务器的端口 producer = KafkaProducer(bootstrap_servers=[‘{kafka_host}:{kafka_port}‘.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #简单for循环10次,发送10条消息 for i in range(1,10): message_string = ‘some message‘.format(i) #调用send方法,发送名字为‘topic1‘的topicid ,发送的消息为message_string response = producer.send(‘topic1‘, message_string.encode(‘utf-8‘)) print response
consumer.py
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = ‘192.168.1.200‘ # kafka服务器地址 kafka_port = 9092 # kafka服务器端口 #消费topic1的topic,并指定group_id(自定义),多个机器或进程想顺序消费,可以指定同一个group_id, # 如果想一条消费多次消费,可以换一个group_id,会从头开始消费 consumer = KafkaConsumer( ‘topic1‘, group_id = ‘my-group‘, bootstrap_servers = [‘{kafka_host}:{kafka_port}‘.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json读取kafka的消息 content = json.loads(message.value) print content
本文出自 “马鹏飞——著” 博客,请务必保留此出处http://mapengfei.blog.51cto.com/1552412/1926068
以上是关于kafka-3python生产者和消费者实用demo的主要内容,如果未能解决你的问题,请参考以下文章
翻译:《实用的Python编程》06_04_More_generators