python multiprocessing 独立启动和关闭进程
Posted
技术标签:
【中文标题】python multiprocessing 独立启动和关闭进程【英文标题】:python multiprocessing start and close processes independently 【发布时间】:2021-06-27 00:29:04 【问题描述】:我正在尝试使用多处理运行 tensorflow 推理。每个进程使用 1 个 GPU。我有一个文件列表 input_files[]。每个进程获取一个文件,在其上运行 model.predict 并将结果写入文件。要继续下一个文件,我需要关闭进程并重新启动它。这是因为 tensorflow 不会释放内存。所以如果我使用相同的进程,我会得到内存泄漏。
我在下面编写了一个有效的代码。我启动 5 个进程,关闭它们并启动另一个 5。问题是所有进程都需要等待最慢的进程才能继续。如何独立于其他进程启动和关闭每个进程?
请注意,Pool.map 位于 input_files_small 而不是 input_files。
file1 --> start new process --> run prediction --> close process --> file2 --> start new process --> etc.
for i in range(0, len(input_files), num_process):
input_files_small = input_files[i:i+num_process]
try:
process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids))
pool_output = process_pool.map(worker_fn, input_files_small)
finally:
process_pool.close()
process_pool.join()
【问题讨论】:
在创建pool
时使用mastasksperchild
参数,而不是自己尝试...
【参考方案1】:
无需一遍又一遍地重新创建处理池。首先,在创建池时指定 maxtasksperchild=1。这应该会导致为每个提交的新任务创建一个新流程。而不是使用方法map
,而是使用方法map_async
,它不会阻塞。如果您的工作函数未返回您需要的结果,您可以使用pool.close
后跟pool.join()
隐式等待这些提交完成,如下所示或使用第二个代码变体:
process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
for i in range(0, len(input_files), num_process):
input_files_small = input_files[i:i+num_process]
process_pool.map_async(worker_fn, input_files_small))
# wait for all outstanding tasks to complete
process_pool.close()
process_pool.join()
如果您需要来自worker_fn
的返回值:
process_pool = multiprocessing.Pool(processes=num_process, initializer=init_worker, initargs=(gpu_ids), maxtasksperchild=1)
results = []
for i in range(0, len(input_files), num_process):
input_files_small = input_files[i:i+num_process]
results.append(process_pool.map_async(worker_fn, input_files_small))
# get return values from map_async
pool_outputs = [result.get() for result in results]
# you do not need process_pool.close() and process_pool.join()
但是,由于在稍后调用 map_async
的任务启动时,之前调用 map_async
的某些“慢”任务仍在运行,因此其中一些任务可能仍需要等待才能运行。但至少池中的所有进程都应该保持相当繁忙。
如果您期望工作函数出现异常并需要在主进程中处理它们,那么情况会变得更加复杂。
【讨论】:
谢谢。这似乎可以完成这项工作。我认为我不再需要 input_files_small 了。我可以将整个 input_files 列表提供给异步映射。 唯一需要注意的是进程需要在进程完成时将释放的 gpu 添加回队列。这是为了让下一个进程可以使用释放的 gpu,所以队列需要在进程之间共享。以上是关于python multiprocessing 独立启动和关闭进程的主要内容,如果未能解决你的问题,请参考以下文章
Python多进程运行——Multiprocessing基础教程2