你如何让多处理池不启动新进程但也不终止当前正在运行的进程?
Posted
技术标签:
【中文标题】你如何让多处理池不启动新进程但也不终止当前正在运行的进程?【英文标题】:How do you get multiprocessing Pool to not spin up new processes but also not terminate currently running processes? 【发布时间】:2021-11-20 21:03:22 【问题描述】:我在Python 2.7
中使用Python multiprocessing.Pool
类。我有大量只能在一天中的某个时间段内运行的作业。每项工作都需要一些时间。我想将作业限制为一次最多并行运行 n 个。
池功能可以很好地限制并行作业的数量,但是当我试图结束这些作业时它似乎有问题。当我在窗口结束时,我希望当前正在运行的作业完成它们的处理。我不想开始新的工作。我一直在尝试使用Pool.close()
来执行此操作,它确实让我的运行进程按需要完成,但从实验看来,即使在池之后,队列中但尚未开始处理的作业仍将提交处理已关闭。
另一个选项Pool.terminate()
甚至会主动关闭正在运行的作业,这与预期的行为背道而驰。
Function | Allows running jobs to finish | Prevents new jobs from starting |
---|---|---|
.terminate() | No | Yes |
.close() | Yes | No |
Desired behaviour | Yes | Yes |
【问题讨论】:
只是一个想法,但一种简单的方法可能是让每个作业的开头检查来自主进程的信号或事件;一旦你在运行窗口结束时设置了它,任何正在运行的作业都将继续,因为它们在开始时已经检查了信号,但任何新开始的作业都会立即退出。 (当大量短期进程在短时间内启动时,可能会对性能造成一点影响) 【参考方案1】:首先,您不应该使用 Python2.7,它已经被弃用了一段时间。
您应该使用concurrent.futures
标准库中的ProcessPoolExecutor
并调用.shutdown()
方法并激活cancel_futures
标志,以让执行程序完成已启动的作业,但取消任何待处理的工作。
from concurrent.futures import ProcessPoolExecutor
parallel_jobs = 4 # The pool size
executor = ProcessPoolExecutor(parallel_jobs)
future_1 = executor.submit(work_1, argument_1)
...
future_n = executor.submit(work_n, argument_n)
...
# At some point when the time window ends and you need to stop jobs:
executor.shutdown(cancel_futures=True)
# Some futures such as future_n may have been cancelled here, you can check that:
if future_n.cancelled():
print("job n has been cancelled")
# Or you can try to get the result while checking for CancelledError:
try:
result_n = future_n.result()
except CancelledError:
print("job n hasn't finished in the given time window")
这里是一个取消的例子:
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep
# The job to execute concurrently
def foo(i: int) -> str:
sleep(0.2)
print(i)
return f"i"
e = ThreadPoolExecutor(4)
# Jobs are scheduled concurrently, this call does not block
futures = [e.submit(foo, i) for i in range(100)]
# Shutdown during execution and cancel pending jobs
e.shutdown(cancel_futures=True)
# Gather completed results
results = [f.result() for f in futures if not f.cancelled()]
print(results)
如果您执行此代码,您会看到 100 个计划的作业并未全部完成,只有一些是因为执行器已在其间关闭。
【讨论】:
这似乎是一种非常有用的方法。不幸的是,我没有在目标机器上设置 3.x,当我尝试将它用于我的 Conda 环境时,我得到 glibc 错误。感谢您的选择!如果我能够解决这些问题,我会花一些时间来解决这些 glibc 错误并尝试使用此选项。 不知何故,最近的版本(3.9)今天开始为我工作,没有改变任何东西。但是,当我使用 cancel_futures=True 运行它时,它会在池关闭后继续运行新作业。如果我使用.submit
和.map
提交作业有区别吗?要么这也不符合我的预期,要么我在提交作业时做错了。
是的,.submit
安排作业异步运行(它不等待结果),所以你必须等待未来的结果,而 .map
任务是同时运行的,但它等待所有要完成的任务。
我在回答中添加了细节
感谢您的精彩回答!我想澄清未来的读者要小心.map
,因为关闭池不会取消.map
创建的待处理作业。以上是关于你如何让多处理池不启动新进程但也不终止当前正在运行的进程?的主要内容,如果未能解决你的问题,请参考以下文章