如何在 python 中扩展 Kafka 消费者?

Posted

技术标签:

【中文标题】如何在 python 中扩展 Kafka 消费者?【英文标题】:How do I scale Kafka Consumers in python? 【发布时间】:2020-05-26 02:03:36 【问题描述】:

这可能有多个问题,请耐心等待。我仍在寻找使用 Kafka 架构的正确方法。我知道一个主题的分区是按消费者划分的。

究竟什么是消费者?现在,我正在考虑编写一个充当消费者的守护程序 python 进程。当消费者消费来自 Kafka 的消息时,有一个任务是我必须完成的。这是一项艰巨的任务,因此我正在创建同时运行的子任务。我可以在同一台机器上拥有多个消费者(python 脚本)吗?

我正在处理多个微服务,所以每个微服务都有自己的消费者?

当负载增加时,我必须扩展消费者。我想产生一台新机器,充当另一个消费者。但是我只是觉得我在这里做错了,觉得必须有更好的方法。

您能告诉我您是如何根据负载扩展您的消费者的吗?如果我需要增加我的消费者,我是否必须增加我的主题分区?我如何动态地做到这一点?当产生的消息较少时,我可以减少分区吗?最初有多少个分区是理想的?

并请提出一些可遵循的良好做法。

这是我正在使用的消费者脚本

while True:
    message = client.poll(timeout=10)#client is the KafkaConsumer object
    if message is not None:
        if message.error():
            raise KafkaException(message.error())
        else:
            logger.info('recieved topic topic partition partition offset offset key key - value'.format(
                topic=message.topic(),
                partition=message.partition(),
                offset=message.offset(),
                key=message.key(),
                value=message.value()
            ))
            #run task

【问题讨论】:

注意:请将您的帖子限制在一个特定的问题上 【参考方案1】:

我可以在同一台机器上拥有多个消费者(python 脚本)吗?

是的。不过,您也可以使用 Python 线程。

如果您不使用多个主题,则不需要多个消费者。

究竟什么是消费者?

请随意阅读 Apache Kafka 网站...

每个微服务都有自己的消费者?

每个服务是否运行相似的代码?那么是的。

我想生产一台新机器

在一台机器上生成您的应用的新实例。监控 CPU 和内存以及网络负载。在至少其中一台在正常处理下超过 70% 之前,不要购买新机器。

如果我需要增加我的消费者,我是否必须增加我的主题分区?

一般来说,是的。 消费者组中的消费者数量受订阅主题中的分区数量限制。

当产生的消息较少时,我可以减少分区吗?

没有。分区不能减少

当负载增加时,我必须扩展消费者

不一定。增加的负荷是不断上升,还是一波又一波?如果可变,那么您可以让 Kafka 缓冲消息。消费者将尽可能快地进行轮询和处理。

您需要定义您的 SLA,以了解从生产者到达主题后处理消息需要多长时间。

最初有多少个分区是理想的?

这方面的文章有很多,具体看你自己的硬件和应用需求。只需记录每条消息,您就可以拥有数千个分区...

当消费者消费来自 Kafka 的消息时,有一个任务我必须完成

听起来您可能想看看 Celery,而不仅仅是 Kafka。您也可以look at Faust 进行 Kafka 处理

【讨论】:

以上是关于如何在 python 中扩展 Kafka 消费者?的主要内容,如果未能解决你的问题,请参考以下文章

如何利用pykafka远程消费 zookeeper+kafka集群 python脚本

Kafka 是如何实现事务的

.Net Core中的Kafka消费者[关闭]

如何在kafka-python和confluent-kafka之间做出选择

kafka消费组添加消费者

PHP安装Kafka扩展