如何在不关闭的情况下运行 python 多处理池

Posted

技术标签:

【中文标题】如何在不关闭的情况下运行 python 多处理池【英文标题】:How to run a python multiprocessing pool without closing 【发布时间】:2021-10-20 06:07:41 【问题描述】:

我正在尝试同时运行多个 Bert 模型副本。

我有一个包含池的 python 对象:

self.tokenizer = BertTokenizer.from_pretrained(BERT_LARGE)
self.model = BertForQuestionAnswering.from_pretrained(BERT_LARGE)
self.pool = Pool(processes=max_processes,
                 initializer=pool_init, 
                 initargs=(self.model, self.tokenizer))

池中的每个进程都通过 Bert 标记器和模型进行复制:

process_model = None
process_tokenizer = None

def pool_init(m: BertForQuestionAnswering, t: BertTokenizer):
    global process_model, process_tokenizer
    process_model, process_tokenizer = m, t

要使用池,然后我运行

while condition:
    answers = self.pool.map(answer_func, questions)
    condition = check_condition(answers)

这种设计是为了避免每次初始化池时将Bert模型重新加载到每个进程中的大开销(每个进程大约需要1.5-2秒)。

问题 1。这是最好的方法吗?

问题 2。如果是这样,我应该什么时候打电话给self.pool.close()self.pool.join()?我想在check_condition() 函数之前join(),但我真的不想close() 池(除非直到对象的__del__())但是在调用join() 之前调用close() 给了我错误,并且调用 close() 会使池在将来无法调用。池只是不适合这类工作,我应该管理一系列进程吗?帮助...?

谢谢!!

【问题讨论】:

【参考方案1】:

您说,“这种设计是为了避免每次初始化池时将 Bert 模型重新加载到每个进程中的巨大开销(每个进程大约需要 1.5-2 秒)。”你的陈述和你展示的少量代码对我来说并不完全合理。我认为这是一个术语问题。

首先,我看不到池在哪里被多次初始化;我只看到一个创建池的实例:

self.pool = Pool(processes=max_processes,
                 initializer=pool_init, 
                 initargs=(self.model, self.tokenizer))

但是,如果您要多次创建池,则 实际上是在您当前的设计中使用pool_init 函数在每次池被重新加载到池的每个进程中时创造而不是回避你说你正在回避的东西。但这可能是件好事。所以我怀疑我们在谈论两件不同的事情。所以我只能解释实际发生了什么:

由于您的while condition: 循环,您可能会多次调用pool.map 函数。但是,一般来说,如果可以避免多次创建池,您确实希望避免这样做。 现在我想到了两个原因,可以像您一样使用 Pool 构造函数的 initializerinitargs 参数::: p>

    如果您的工作函数(在您的情况下为answer_func)需要访问只读数据项,而不是在每次调用该函数时传递这些项,初始化全局变量通常更便宜池中具有这些数据项的每个进程,并让您的工作函数只访问全局变量。 某些数据类型,例如multiprocessing.Lock 实例,不能使用任何multiprocessing.Pool 方法作为参数传递,需要使用池初始化函数“传递”。

案例 2 似乎不适用。因此,如果您有 100 个问题和 8 个池大小,最好通过模型和标记器 8 次,池中的每个进程一次,而不是 100 次,每个问题一次。

如果您使用Pool.map 方法,该方法会阻塞直到所有提交任务完成,您可以确定当该方法返回时池中没有进程正在运行任何任务。 如果您将重新执行池创建代码,那么当您终止 while condition: 循环时,您应该通过调用 pool.close() 后跟 pool.join() 来释放资源,这将等待池中的进程终止,或者您可以调用pool.terminate(),它会立即终止所有池进程(我们知道此时它们处于空闲状态)。如果你只创建一次池,你真的不需要调用任何东西;池中的进程是守护进程,它将在您的主进程终止时终止。但是,如果您将在不再需要池之后进行进一步处理,那么为了尽快释放资源,您应该执行前面描述的“清理”。

这有意义吗?

补充说明

由于pool.map 会阻塞直到所有提交任务完成,所以无需调用pool.join() 来确保任务已完成; pool.map 将返回您的工作函数返回的所有返回值的列表。 answer_func.

pool.join() 可能有用的地方,除了我已经提到的释放资源之外,当您发出一个或多个 pool.apply_async 方法调用时。此方法是非阻塞的,并返回一个AsyncResult 实例,您稍后可以在该实例上发出get 调用来阻塞以完成任务并获取返回值。但是如果您对返回值不感兴趣并且只需要等待任务完成,那么只要您不再需要向池提交任何任务,您只需发出@987654342 @ 后跟 pool.join(),在这两个调用完成时,您可以确定所有提交的任务都已完成(可能有例外)。

因此,在类的 __del__ 方法中调用 pool.terminate() 对于一般用法来说是个好主意。

【讨论】:

以上是关于如何在不关闭的情况下运行 python 多处理池的主要内容,如果未能解决你的问题,请参考以下文章

Python多处理:如何在异常时关闭多处理池

如何在不关闭应用程序的情况下处理异常? [关闭]

如何在程序启动后在不打开控制台的情况下从批处理文件运行程序?

如何使用 Python Ray 在不耗尽内存的情况下并行处理大量数据?

我想为具有多任务处理功能的 android 开发一个音乐播放器应用程序,这样我就可以在不关闭音乐播放器的情况下使用不同的应用程序......帮助 plzz [关闭]

如何在不关闭 TCP 连接的情况下关闭处理 TCP 请求的线程?