python multiprocessing 独立启动和关闭进程

Posted

技术标签:

【中文标题】python multiprocessing 独立启动和关闭进程【英文标题】:python multiprocessing start and close processes independently 【发布时间】:2021-06-27 00:29:04 【问题描述】:

我正在尝试使用多处理运行 tensorflow 推理。每个进程使用 1 个 GPU。我有一个文件列表 input_files[]。每个进程获取一个文件,在其上运行 model.predict 并将结果写入文件。要继续下一个文件,我需要关闭进程并重新启动它。这是因为 tensorflow 不会释放内存。所以如果我使用相同的进程,我会得到内存泄漏。

我在下面编写了一个有效的代码。我启动 5 个进程,关闭它们并启动另一个 5。问题是所有进程都需要等待最慢的进程才能继续。如何独立于其他进程启动和关闭每个进程?

请注意,Pool.map 位于 input_files_small 而不是 input_files。

file1 --> start new process --> run prediction --> close process --> file2 --> start new process --> etc.


for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    try:
        process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids))
        pool_output = process_pool.map(worker_fn, input_files_small)
    finally:
        process_pool.close()
        process_pool.join()

【问题讨论】:

在创建pool 时使用mastasksperchild 参数,而不是自己尝试... 【参考方案1】:

无需一遍又一遍地重新创建处理池。首先,在创建池时指定 maxtasksperchild=1。这应该会导致为每个提交的新任务创建一个新流程。而不是使用方法map,而是使用方法map_async,它不会阻塞。如果您的工作函数未返回您需要的结果,您可以使用pool.close 后跟pool.join() 隐式等待这些提交完成,如下所示或使用第二个代码变体:

process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    process_pool.map_async(worker_fn, input_files_small))
# wait for all outstanding tasks to complete
process_pool.close()
process_pool.join()

如果您需要来自worker_fn 的返回值:

process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
results = []
for i in range(0, len(input_files), num_process):
    input_files_small = input_files[i:i+num_process]
    results.append(process_pool.map_async(worker_fn, input_files_small))
# get return values from map_async
pool_outputs = [result.get() for result in results]
# you do not need process_pool.close() and process_pool.join()

但是,由于在稍后调用 map_async 的任务启动时,之前调用 map_async 的某些“慢”任务仍在运行,因此其中一些任务可能仍需要等待才能运行。但至少池中的所有进程都应该保持相当繁忙。

如果您期望工作函数出现异常并需要在主进程中处理它们,那么情况会变得更加复杂。

【讨论】:

谢谢。这似乎可以完成这项工作。我认为我不再需要 input_files_small 了。我可以将整个 input_files 列表提供给异步映射。 唯一需要注意的是进程需要在进程完成时将释放的 gpu 添加回队列。这是为了让下一个进程可以使用释放的 gpu,所以队列需要在进程之间共享。

以上是关于python multiprocessing 独立启动和关闭进程的主要内容,如果未能解决你的问题,请参考以下文章

Python多进程运行——Multiprocessing基础教程2

113 python程序中的进程操作-进程间数据共享(multiProcess.Manger)

我的 Python 多进程显然不是独立的

python进程线程:共享内存

python-多任务编程01-进程

python多进程multiprocessing