python生产消费Kafka

Posted Mars.wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python生产消费Kafka相关的知识,希望对你有一定的参考价值。

python生产消费Kafka主要是跟KafkaConsumer和KafkaProducer两个类打交道.

from kafka import KafkaProducer, KafkaConsumer


class PythonKafka(object):
    def __init__(self, topic=None, username=None, password=None):
        self.topic = topic
        self.username = username
        self.password = password
        self.count = 0
        self.error_count = 0

    def on_send_success(self, *args, **kwargs):
        self.count = self.count + 1

    def on_send_error(self, *args, **kwargs):
        self.error_count = self.error_count + 1

    def get_kafka_producer(self):
        kafka_producer = KafkaProducer(bootstrap_servers=[ip1:port, ip2:port, ip3:port],
                                       security_protocol=SASL_PLAINTEXT,
                                       sasl_mechanism=PLAIN,
                                       sasl_plain_username=username,
                                       sasl_plain_password=password,
                                       request_timeout_ms=40000,
                                       api_version=(0, 10, 1))
        return kafka_producer

    def get_kafka_consumer(self):
        # 指定超时时间,不然会一直阻塞
        consumer_params = {
            security_protocol: SASL_PLAINTEXT,
            sasl_mechanism: PLAIN,
            group_id: test_group2,
            sasl_plain_username: username,
            sasl_plain_password: password,
            auto_offset_reset: earliest,
            api_version: (0, 10),
            consumer_timeout_ms: 10000
        }

        return KafkaConsumer(self.topic,
                             bootstrap_servers=[ip1:port, ip2:port, ip3:port],
                             **consumer_params)

    # 异步发送
    def send_data(self, json_str):
        json_bytes = str.encode(json_str)
        producer = self.get_kafka_producer()
        producer.send(self.topic, value=json_bytes).add_callback(self.on_send_success).add_errback(
            self.on_send_error)
        producer.flush()
        producer.close()

    # 接收数据
    def receive_data(self):
        consumer = self.get_kafka_consumer()
        for msg in consumer:
            msg_str = str(msg.value, encoding=utf8)
            print(msg_str)
        consumer.close()

 

以上是关于python生产消费Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 通过python简单的生产消费实现

一对kafka消息生产者-消息消费者,python

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

Kafka 简单实验二(Python实现简单生产者消费者)

kafka-3python生产者和消费者实用demo

Python 基于Python结合pykafka实现kafka生产及消费速率&主题分区偏移实时监控