04_kafka python客户端_Producer模拟

Posted shayzhang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了04_kafka python客户端_Producer模拟相关的知识,希望对你有一定的参考价值。

使用的python库: kafka-python

安装方式: pip install kafka-python

 

简单的模拟Producer

"""
Kafka Producer Test
using kafka-python library
"""
# -*- encoding: utf-8 -*-
# Author: shayzhang@sina.com

# import KafkaProducer class
from kafka import KafkaProducer
# import KafkaError class
from kafka.errors import KafkaTimeoutError
# time for message timestamp
import time


def main():
    # 创建producer实例,并传入bootstrap_servers列表(brokers), 修改producer实例配置
    producer = KafkaProducer(bootstrap_servers=["192.168.229.100:9092", "192.168.229.101:9092", "192.168.229.102:9092"])

    # topic to be published
    topic = \'ctopic\'

    # message value to be published, must be bytes type
    msg = bytes(\'hello_from_python\', encoding=\'utf-8\')
    # for python2:  msg = b\'hello_from_python\'

    # message key, must be bytes type
    # used to determine which partition the message will be stored
    key = bytes(\'shay\', encoding=\'utf-8\')
    # for python2:  key = b\'shay\'

    # Async send, default
    try:

        # get partitions for the topic
        partition_set = producer.partitions_for(topic)
        for e in partition_set:
            print("Partition: " + str(e))
            # print \'Partition: \'+ str(e)

        future = producer.send(topic, msg, key, partition=None, timestamp_ms=time.time())
        # block until all records are sent to cluster
        producer.flush()

        print("Message Send!")
        # print "Message send!"
    except  KafkaTimeoutError:
        print("Kafka Timeout")
        # print("Kafka Timeout")


if __name__ == \'__main__\':
    main()

 

在集群上任选1个节点,开启console-consumer,  运行该py文件

Consumer收到该数据

 

以上是关于04_kafka python客户端_Producer模拟的主要内容,如果未能解决你的问题,请参考以下文章

错误:通过 Python 增加 MaxLocksPerFile 注册表项

python confluent kafka客户端-无法使用SSL访问GKE上的Kafka

Kafka Consumer(Python threading)

kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)

python 消费kafka 写入es 小记

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)