Python多处理imap - 丢弃超时进程
Posted
技术标签:
【中文标题】Python多处理imap - 丢弃超时进程【英文标题】:Python multiprocessing imap - discard timeout processes 【发布时间】:2021-12-23 21:32:33 【问题描述】:使用 Python 多处理我想捕获进程丢弃它们并继续下一个进程。
在下面的示例中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程会重新执行,因此脚本将永远运行。
如何捕获 TimeOut 错误、终止导致该错误的进程并防止该进程重新执行?重要的是我可以使用 imap 做到这一点。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs)
idx = 0
for s in (inputs):
try:
res = futures_res.next(timeout=0.1)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
# Catch time out error
# I want this to also prevent the process from being executed again
# solutions.append(0.0)
# Should print 4
print(len(solutions))
print(solutions)
【问题讨论】:
【参考方案1】:您可能对imap
如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从顶部开始:
为了确定当您对imap
返回的迭代器执行next(timeout=some_value)
时是否会引发multiprocessing.TimeoutError
异常,计时开始于任务被进程从队列中取出以执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会启动,即第三个任务的计时开始而不是从提交所有任务开始。
但是当你得到一个超时异常时,正在执行的任务实际上并没有发生任何事情——它会继续执行。您仅从 imap
迭代返回值 6 次。但是如果你无限地迭代直到你得到一个StopIteration
异常,你最终会看到所有任务最终都完成并返回了一个值,可能会在此过程中抛出多个超时错误。
一种解决方案是继续从inputs
列表中删除与您正在迭代其结果的任务相对应的输入值,但是一旦您遇到超时异常,您就会终止池中的剩余任务(如果有的话) inputs
列表中仍保留的任何输入,使用新的 inputs
列表重新运行 imap
。
三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给imap
,因为imap
“lazily”评估了pasaed iterable,并且您将在迭代imap
和@的返回值时修改inputs
列表如果您没有通过副本,否则 987654335@ 仍会评估 inputs
。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给工作函数,我仍然不时遇到超时异常。
import time
import multiprocessing as mp
def a_func(x):
print(x)
if x:
return x
# Function sleeps before returning
# to trigger timeout error
else:
time.sleep(2.0)
return x
if __name__ == "__main__":
solutions = []
# Inputs sum to 4
inputs = [1, 1, 0, 1, 1, 0]
while inputs:
with mp.get_context("spawn").Pool(1) as pool:
futures_res = pool.imap(a_func, inputs.copy())
while inputs:
s = inputs.pop(0)
try:
res = futures_res.next(timeout=.5)
# If successful (no time out), append the result
solutions.append(res)
except mp.context.TimeoutError:
print(s, "err")
break
# Should print 4
print(len(solutions))
print(solutions)
打印:
1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]
【讨论】:
谢谢,这已经为我解决了。您的解决方案看起来很适合保持剩余任务运行,同时删除导致我出现问题的任务!以上是关于Python多处理imap - 丢弃超时进程的主要内容,如果未能解决你的问题,请参考以下文章
我可以在 Pool.imap 调用的函数中使用多处理队列吗?