04_kafka python客户端_Producer模拟
Posted shayzhang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04_kafka python客户端_Producer模拟相关的知识,希望对你有一定的参考价值。
使用的python库: kafka-python
安装方式: pip install kafka-python
简单的模拟Producer
""" Kafka Producer Test using kafka-python library """ # -*- encoding: utf-8 -*- # Author: shayzhang@sina.com # import KafkaProducer class from kafka import KafkaProducer # import KafkaError class from kafka.errors import KafkaTimeoutError # time for message timestamp import time def main(): # 创建producer实例,并传入bootstrap_servers列表(brokers), 修改producer实例配置 producer = KafkaProducer(bootstrap_servers=["192.168.229.100:9092", "192.168.229.101:9092", "192.168.229.102:9092"]) # topic to be published topic = \'ctopic\' # message value to be published, must be bytes type msg = bytes(\'hello_from_python\', encoding=\'utf-8\') # for python2: msg = b\'hello_from_python\' # message key, must be bytes type # used to determine which partition the message will be stored key = bytes(\'shay\', encoding=\'utf-8\') # for python2: key = b\'shay\' # Async send, default try: # get partitions for the topic partition_set = producer.partitions_for(topic) for e in partition_set: print("Partition: " + str(e)) # print \'Partition: \'+ str(e) future = producer.send(topic, msg, key, partition=None, timestamp_ms=time.time()) # block until all records are sent to cluster producer.flush() print("Message Send!") # print "Message send!" except KafkaTimeoutError: print("Kafka Timeout") # print("Kafka Timeout") if __name__ == \'__main__\': main()
在集群上任选1个节点,开启console-consumer, 运行该py文件
Consumer收到该数据
以上是关于04_kafka python客户端_Producer模拟的主要内容,如果未能解决你的问题,请参考以下文章
错误:通过 Python 增加 MaxLocksPerFile 注册表项
python confluent kafka客户端-无法使用SSL访问GKE上的Kafka
Kafka Consumer(Python threading)