kafka-python消息读写操作kafka,python,Windows

Posted zhangphil

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka-python消息读写操作kafka,python,Windows相关的知识,希望对你有一定的参考价值。

kafka-python消息读写操作kafka,python,Windows环境

(1)安装python支持库

pip install kafka-python

(2)先把kafka起起来,

kafka连接zookeeper配置搭建环境、安装和启动,Windows环境下_zhangphil的博客-CSDN博客_kafka zookeeper配置kafka,分布式集群架构下,高性能的流式事件数据(主要是消息)集成、发布(生产)和订阅(分发、消费)组件(中间件)。kafka依赖zooeeper(数据后端),这里有Windows下安装配置启动zookeeper的 文章(1):Windows 10环境zookeeper单机伪集群部署和配置_Zhang Phil-CSDN博客Windows 10环境zookeeper单机伪集群部署和配置1,首先到zookeeper项目主页地址下载项目包,https://archive.apache.org/disthttps://blog.csdn.net/zhangphil/article/details/123086790

然后写好一个消息消费者,等待消息生产者生产消息:

import json

from kafka import KafkaConsumer

if __name__ == '__main__':
    consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
    consumer.subscribe(topics=['fly_topic'])
    conn = consumer.bootstrap_connected()
    if conn:
        print('连接OK')

    ts = consumer.topics()
    print(ts)

    for msg in consumer:
        print(msg)
        print(msg.topic, json.loads(msg.value))

(3)消息生产者:

import datetime
import random

from kafka import KafkaProducer
import json

if __name__ == '__main__':
    producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    val = 
        'msg': random.randint(0, 100),
        'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    
    producer.send(topic='fly_topic', key=b'fly_key', value=json.dumps(val))

    producer.close()

消息生产者运行后,在(2)里面的消费者就会收到消息。

以上是关于kafka-python消息读写操作kafka,python,Windows的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-python 客户端导致的 cpu 使用过高,且无法消费消息的问题

使用Python读写Kafka

如何在kafka-python和confluent-kafka之间做出选择

Kafka 入门--安装配置和 kafka-python 调用

测开之路七十四:python处理kafka

Kafka SASL/PLAIN加密 及Kafka-Python整合