如何让 Python 多线程管道使用 90% 的可用内存?
Posted
技术标签:
【中文标题】如何让 Python 多线程管道使用 90% 的可用内存?【英文标题】:How can I make a Python multithreaded pipeline use 90% of available memory? 【发布时间】:2021-11-26 08:04:51 【问题描述】:我的 Python 脚本在 GCE 实例中运行,将 Pub/Sub 作为输入(异步拉取订阅)和输出。
据我了解,这样我可以控制并发线程的数量,从而限制使用的内存量。如果我将max_messages
设置为 100,我的脚本最终会耗尽内存。
from google.cloud import pubsub_v1
from concurrent import futures
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
def callback(message):
print (str(message.data) + " " + str(threading.current_thread()))
message.ack()
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
executor = futures.ThreadPoolExecutor(max_workers=5)
policy = pubsub_v1.subscriber.policy.thread.Policy(subscriber, subscription_path, executor=executor, flow_control=flow_control)
policy.open(callback)
在我看来,硬编码工人和消息的数量是控制内存利用率的一种原始方法。有没有更好的方法让我的脚本在 VM 资源允许的情况下分配尽可能多的线程以尽可能高效地利用它?
【问题讨论】:
您使用的是什么 GCE 虚拟机(资源)?您是否遵循任何文档?这是你自己的代码还是你基于任何东西?有关您的环境和可能负载的一些详细信息? 您是否考虑过在 PubSub 前面使用带有 Cloud Run 或 Cloud Functions 的推送订阅?它将根据您的 PubSub 流量进行扩展和缩减,因此您将仅使用处理消息所需的资源。不浪费资源。 @PjoterS 我使用 e2-highcpu-16 抢占式虚拟机。我使用 GCP 文档中的“异步拉取”方法,policy
除外。
@guillaumeblaquiere 使用 Cloud Functions 推送订阅会更加昂贵且非常有限,因为 GCF 资源分配只有一些非常基本的配置。 Compute Engine 更加灵活且成本更低。
我不同意,但这是一个观点问题。使用 Cloud Function,您可以在区域内获得高可用性,并且您无需执行补丁管理(以及其他管理系统内容)。是的,每小时的CPU更贵,但如果只考虑前2项的成本,我不确定VM是否更便宜!
【参考方案1】:
一些解决方案
内存限制
Google 云容器实例已允许内存限制。默认情况下,内存限制为 512 MiB。更新内存使用限制的要求是 2vCPU。这对您来说不是问题,因为您使用的是 e2-highcpu-16 抢占式 VM,它提供多达 32 个 CPU。
您可以在 GCP 控制台、命令行或 YAML 文件中更新内存限制。可配置的最大内存为 8Gi。如果您需要确定需要多少内存,您可以使用(Standing Memory) + (Memory per Request) * (Service Concurrency)
进行计算。更多信息请点击reference documentation
CPU 密集型任务:
我不确定您线程中的任务是受 IO 限制还是受 CPU 限制。如果它的 CPU 受限,您可以选择使用ProcessPoolExecutor
。
ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池来异步执行调用。
max_workers
参数上的ProcessPoolExecutor
documentation 引号。
如果
max_workers
为无,则选择的默认值最多为 61
IO 绑定任务
IO 绑定任务非常适合在线程中使用。
根据使用的python版本,max_workers
的默认值会有所不同。
max_workers
参数未给出或设置为无,no of processors on machine *5
(考虑到任务与 I/O 相关而不是 CPU 相关)
3.8 版:max_workers
的默认值更改为min(32, os.cpu_count() + 4)
。此默认值为 I/O 绑定任务保留至少 5 个工作人员。对于释放 GIL 的 CPU 绑定任务,它最多使用 32 个 CPU 内核。
由于您使用的ThreadPoolExecutor
和max_workers
值设置为5,如果您使用python 版本>3.5 不明确设置max_workers
值将允许最大化线程分配。请参考ThreadPoolExecutor
documentation
对于 FlowControl:您可以尝试使用 documentation 中提到的其他参数。 FlowControl
设置用于控制使用异步订阅拉取消息的速率。
class google.cloud.pubsub_v1.types.FlowControl(max_bytes=104857600, max_messages=1000, max_lease_duration=3600, max_duration_per_lease_extension=0)
max_bytes
暂停消息流之前已接收但尚未处理的消息的最大总大小。
max_duration_per_lease_extension
单次租约延长尝试的最大时间量(以秒为单位)。如果订阅者未能延长截止日期,则限制消息重新传递之前的延迟。必须在 10 到 600(含)之间。如果设置为 0,则忽略。
max_lease_duration
在将消息从租约管理中删除之前,保持消息租约的最长时间(以秒为单位)。
max_messages
暂停消息流之前已接收但尚未处理的最大消息数。
【讨论】:
以上是关于如何让 Python 多线程管道使用 90% 的可用内存?的主要内容,如果未能解决你的问题,请参考以下文章