一对kafka消息生产者-消息消费者,python

Posted zhangphil

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一对kafka消息生产者-消息消费者,python相关的知识,希望对你有一定的参考价值。

一对kafka消息生产者-消息消费者,python

消息生产者:

import datetime
import random

from kafka import KafkaProducer
import json

from loguru import logger
import kafka_config


def on_send_success(record_metadata=None):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)


def on_send_error(excp=None):
    logger.error('失败', exc_info=excp)


if __name__ == '__main__':
    msg_producer = KafkaProducer(bootstrap_servers=f'kafka_config.server_host:kafka_config.server_port',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    val = 
        'event_type': 0,
        'msg': random.randint(0, 100),
        'time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    

    future = msg_producer \\
        .send(topic=r'some_topic', key=b'some_key', value=json.dumps(val)) \\
        .add_callback(on_send_success) \\
        .add_errback(on_send_error)

    msg_producer.flush()

    result = future.get(timeout=10)
    print(result)

消息消费者:

import json

from kafka import KafkaConsumer

import kafka_config

if __name__ == '__main__':
    consumer = KafkaConsumer(bootstrap_servers=[f'kafka_config.server_host:kafka_config.server_port'],
                             group_id='my_group')
    conn = consumer.bootstrap_connected()
    if conn:
        print('连接OK')

    topics = consumer.topics()
    for t in topics:
        print(t)

    consumer.subscribe(topics=['some_topic'])

    for msg in consumer:
        print(msg.topic, json.loads(msg.value))
        print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))

kafka配置文件,kafka_config.py:

server_host = '127.0.0.1'
server_port = 9092

先启动消费者,然后生产者再发送消息。Kafka的broker配置参考:

kafka连接zookeeper配置搭建环境、安装和启动,Windows环境下_zhangphil的博客-CSDN博客_kafka zookeeper配置kafka,分布式集群架构下,高性能的流式事件数据(主要是消息)集成、发布(生产)和订阅(分发、消费)组件(中间件)。kafka依赖zooeeper(数据后端),这里有Windows下安装配置启动zookeeper的 文章(1):Windows 10环境zookeeper单机伪集群部署和配置_Zhang Phil-CSDN博客Windows 10环境zookeeper单机伪集群部署和配置1,首先到zookeeper项目主页地址下载项目包,https://archive.apache.org/disthttps://blog.csdn.net/zhangphil/article/details/123086790

以上是关于一对kafka消息生产者-消息消费者,python的主要内容,如果未能解决你的问题,请参考以下文章

大数据消息队列--Kafka概括

kafka-3python生产者和消费者实用demo

不接收融合kafka简单生产者/消费者示例的消息?

kafka——消费者原理解析

Kafka学习

Kafka从入门到实战