如何让 Python 多线程管道使用 90% 的可用内存?

Posted

技术标签:

【中文标题】如何让 Python 多线程管道使用 90% 的可用内存?【英文标题】:How can I make a Python multithreaded pipeline use 90% of available memory? 【发布时间】:2021-11-26 08:04:51 【问题描述】:

我的 Python 脚本在 GCE 实例中运行,将 Pub/Sub 作为输入(异步拉取订阅)和输出。

据我了解,这样我可以控制并发线程的数量,从而限制使用的内存量。如果我将max_messages 设置为 100,我的脚本最终会耗尽内存。

from google.cloud import pubsub_v1
from concurrent import futures

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)

def callback(message):
        print (str(message.data) + " " + str(threading.current_thread()))
        message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)

在我看来,硬编码工人和消息的数量是控制内存利用率的一种原始方法。有没有更好的方法让我的脚本在 VM 资源允许的情况下分配尽可能多的线程以尽可能高效地利用它?

【问题讨论】:

您使用的是什么 GCE 虚拟机(资源)?您是否遵循任何文档?这是你自己的代码还是你基于任何东西?有关您的环境和可能负载的一些详细信息? 您是否考虑过在 PubSub 前面使用带有 Cloud Run 或 Cloud Functions 的推送订阅?它将根据您的 PubSub 流量进行扩展和缩减,因此您将仅使用处理消息所需的资源。不浪费资源。 @PjoterS 我使用 e2-highcpu-16 抢占式虚拟机。我使用 GCP 文档中的“异步拉取”方法,policy 除外。 @guillaumeblaquiere 使用 Cloud Functions 推送订阅会更加昂贵且非常有限,因为 GCF 资源分配只有一些非常基本的配置。 Compute Engine 更加灵活且成本更低。 我不同意,但这是一个观点问题。使用 Cloud Function,您可以在区域内获得高可用性,并且您无需执行补丁管理(以及其他管理系统内容)。是的,每小时的CPU更贵,但如果只考虑前2项的成本,我不确定VM是否更便宜! 【参考方案1】:

一些解决方案

    内存限制

    Google 云容器实例已允许内存限制。默认情况下,内存限制为 512 MiB。更新内存使用限制的要求是 2vCPU。这对您来说不是问题,因为您使用的是 e2-highcpu-16 抢占式 VM,它提供多达 32 个 CPU。

    您可以在 GCP 控制台、命令行或 YAML 文件中更新内存限制。可配置的最大内存为 8Gi。如果您需要确定需要多少内存,您可以使用(Standing Memory) + (Memory per Request) * (Service Concurrency) 进行计算。更多信息请点击reference documentation

    CPU 密集型任务: 我不确定您线程中的任务是受 IO 限制还是受 CPU 限制。如果它的 CPU 受限,您可以选择使用ProcessPoolExecutor

    ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池来异步执行调用。

    max_workers 参数上的ProcessPoolExecutor documentation 引号。

    如果max_workers 为无,则选择的默认值最多为 61

    IO 绑定任务

    IO 绑定任务非常适合在线程中使用。 根据使用的python版本,max_workers的默认值会有所不同。

    3.5 版:max_workers 参数未给出或设置为无,no of processors on machine *5(考虑到任务与 I/O 相关而不是 CPU 相关) 3.8 版:max_workers 的默认值更改为min(32, os.cpu_count() + 4)。此默认值为 I/O 绑定任务保留至少 5 个工作人员。对于释放 GIL 的 CPU 绑定任务,它最多使用 32 个 CPU 内核。

    由于您使用的ThreadPoolExecutormax_workers 值设置为5,如果您使用python 版本>3.5 不明确设置max_workers 值将允许最大化线程分配。请参考ThreadPoolExecutor documentation

    对于 FlowControl:您可以尝试使用 documentation 中提到的其他参数。 FlowControl 设置用于控制使用异步订阅拉取消息的速率。

    class google.cloud.pubsub_v1.types.FlowControl(max_bytes=104857600, max_messages=1000, max_lease_duration=3600, max_duration_per_lease_extension=0)
    

    max_bytes 暂停消息流之前已接收但尚未处理的消息的最大总大小。

    max_duration_per_lease_extension 单次租约延长尝试的最大时间量(以秒为单位)。如果订阅者未能延长截止日期,则限制消息重新传递之前的延迟。必须在 10 到 600(含)之间。如果设置为 0,则忽略。

    max_lease_duration 在将消息从租约管理中删除之前,保持消息租约的最长时间(以秒为单位)。

    max_messages 暂停消息流之前已接收但尚未处理的最大消息数。

【讨论】:

以上是关于如何让 Python 多线程管道使用 90% 的可用内存?的主要内容,如果未能解决你的问题,请参考以下文章

python多线程--管道交互

Python线程队列与多处理管道

Python - 并发编程

如何判断有另一个线程试图持有互斥锁?

在 Python 中使用管道文件描述符时如何检查 EOF?

如何让通用 Webhook 触发器插件与 Jenkins 中的多分支管道一起使用?