python 实现 kakfa 的 生产消费模式 和 发布订阅模式

Posted wjq310

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 实现 kakfa 的 生产消费模式 和 发布订阅模式相关的知识,希望对你有一定的参考价值。

python 实现 kakfa 的 生产消费模式 和 发布订阅模式(已安装好 kafka 的情况下)
生产者 producer_demo.py
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
    # 假设生产的消息为键值对(不是一定要键值p对),且序列化方式为json
    producer = KafkaProducer(
        bootstrap_servers=[\'localhost:9092\'],
        key_serializer=lambda k: json.dumps(k).encode(),
        value_serializer=lambda v: json.dumps(v).encode())
    # 发送三条消息
    for i in range(0, 3):
        future = producer.send(
            \'kafka_demo\',
            key=\'count_num\',  # 同一个key值,会被送至同一个分区
            value=str(i),
            partition=0)  # 向分区1发送消息
        print("send {}".format(str(i)))
        try:
            future.get(timeout=10) # 监控是否发送成功
        except kafka_errors:  # 发送失败抛出kafka_errors
            traceback.format_exc()
if __name__ == "__main__":
    producer_demo()
 
消费者 consumer_demo1.py
from kafka import KafkaConsumer
import json
def consumer_demo():
    consumer = KafkaConsumer(
        \'kafka_demo\',
        bootstrap_servers=[\'localhost:9092\'],
        group_id=\'test\'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
if __name__ == "__main__":
    consumer_demo()

 

消费者 consumer_demo2.py

from kafka import KafkaConsumer
import json
def consumer_demo():
    consumer = KafkaConsumer(
        \'kafka_demo\',
        bootstrap_servers=[\'localhost:9092\'],
        group_id=\'test\'
    )
    for message in consumer:
        print("receive, key: {}, value: {}".format(
            json.loads(message.key.decode()),
            json.loads(message.value.decode())
            )
        )
if __name__ == "__main__":
    consumer_demo()

 

总结 : kafka 的生产和消费文件的 topic 都是 kafka_demo , 消费者 consumer_demo1.py 和consumer_demo2.py 文件中,
如果 group_id 都为 test 的话,则为 生产消费模式,两个消费者只有一个会消费 topic 的消息;
如果 group_id 不都为 test 的话,则为 发布订阅模式,两个消费者都会消费 topic 的消息。

 
 

 

以上是关于python 实现 kakfa 的 生产消费模式 和 发布订阅模式的主要内容,如果未能解决你的问题,请参考以下文章

python实现生产者和消费者模式

用Python多线程实现生产者消费者模式爬取斗图网的表情图片

python生产者和消费者模式实现

kakfa从入门到放弃: 相关概念,幂等性&事务

kakfa从入门到放弃: 相关概念,幂等性&事务

kakfa从入门到放弃: 相关概念,幂等性&事务