Python多处理imap - 丢弃超时进程

Posted

技术标签:

【中文标题】Python多处理imap - 丢弃超时进程【英文标题】:Python multiprocessing imap - discard timeout processes 【发布时间】:2021-12-23 21:32:33 【问题描述】:

使用 Python 多处理我想捕获进程丢弃它们并继续下一个进程。

在下面的示例中,我有一个 1 和 0 的列表作为输入。 0 将启动睡眠功能以触发超时错误。触发超时的进程会重新执行,因此脚本将永远运行。

如何捕获 TimeOut 错误、终止导致该错误的进程并防止该进程重新执行?重要的是我可以使用 imap 做到这一点。

import time
import multiprocessing as mp

def a_func(x):
    print(x)
    if x:
        return x
    
    # Function sleeps before returning
    # to trigger timeout error
    else:
        time.sleep(2.0)
        return x


if __name__ == "__main__":
    solutions = []

    # Inputs sum to 4
    inputs = [1, 1, 0, 1, 1, 0]

    with mp.get_context("spawn").Pool(1) as pool:
        futures_res = pool.imap(a_func, inputs)
        idx = 0
        for s in (inputs):
            try:
                res = futures_res.next(timeout=0.1)
                # If successful (no time out), append the result
                solutions.append(res)
            
            except mp.context.TimeoutError:
                print(s, "err")
                # Catch time out error
                # I want this to also prevent the process from being executed again
                # solutions.append(0.0)

    # Should print 4
    print(len(solutions))
    print(solutions)

【问题讨论】:

【参考方案1】:

您可能对imap 如何处理超时感到有些困惑,或者您没有清楚地表达您的问题,或者我感到困惑。所以让我们从顶部开始:

为了确定当您对imap 返回的迭代器执行next(timeout=some_value) 时是否会引发multiprocessing.TimeoutError 异常,计时开始于任务被进程从队列中取出以执行。因此,如果池中只有一个进程并提交了 6 个任务,则不会执行并行处理,例如,第三个任务将在第二个任务完成之前不会启动,即第三个任务的计时开始而不是从提交所有任务开始。

但是当你得到一个超时异常时,正在执行的任务实际上并没有发生任何事情——它会继续执行。您仅从 imap 迭代返回值 6 次。但是如果你无限地迭代直到你得到一个StopIteration 异常,你最终会看到所有任务最终都完成并返回了一个值,可能会在此过程中抛出多个超时错误。

一种解决方案是继续从inputs 列表中删除与您正在迭代其结果的任务相对应的输入值,但是一旦您遇到超时异常,您就会终止池中的剩余任务(如果有的话) inputs 列表中仍保留的任何输入,使用新的 inputs 列表重新运行 imap

三点:当你终止池时,池中的进程可能已经开始执行输入队列上的下一个任务。所以这需要是一个可重新启动的任务。您还需要将输入列表的副本传递给imap,因为imap“lazily”评估了pasaed iterable,并且您将在迭代imap和@的返回值时修改inputs列表如果您没有通过副本,否则 987654335@ 仍会评估 inputs。您应该传递比 .1 稍大的超时值,因为在我的桌面上,即使将值 1 传递给工作函数,我仍然不时遇到超时异常。

import time
import multiprocessing as mp

def a_func(x):
    print(x)
    if x:
        return x

    # Function sleeps before returning
    # to trigger timeout error
    else:
        time.sleep(2.0)
        return x


if __name__ == "__main__":
    solutions = []

    # Inputs sum to 4
    inputs = [1, 1, 0, 1, 1, 0]

    while inputs:
        with mp.get_context("spawn").Pool(1) as pool:
            futures_res = pool.imap(a_func, inputs.copy())
            while inputs:
                s = inputs.pop(0)
                try:
                    res = futures_res.next(timeout=.5)
                    # If successful (no time out), append the result
                    solutions.append(res)
                except mp.context.TimeoutError:
                    print(s, "err")
                    break

    # Should print 4
    print(len(solutions))
    print(solutions)

打印:

1
1
0
0 err
1
1
0
0 err
4
[1, 1, 1, 1]

【讨论】:

谢谢,这已经为我解决了。您的解决方案看起来很适合保持剩余任务运行,同时删除导致我出现问题的任务!

以上是关于Python多处理imap - 丢弃超时进程的主要内容,如果未能解决你的问题,请参考以下文章

我可以在 Pool.imap 调用的函数中使用多处理队列吗?

Python多处理:超过超时后通过参数终止进程

进程池Pool的imap方法解析

Python多处理池:完成任何k个作业后终止进程

显示 Python 多处理池 imap_unordered 调用的进度?

内存泄漏在哪里? python - 如何在python中的多处理期间使线程超时?