Python多处理模块:加入超时进程

Posted

技术标签:

【中文标题】Python多处理模块:加入超时进程【英文标题】:Python multiprocessing module: join processes with timeout 【发布时间】:2014-11-21 17:16:21 【问题描述】:

我正在优化复杂模拟的参数。我正在使用多处理模块来提高优化算法的性能。我在http://pymotw.com/2/multiprocessing/basics.html 学到的多处理基础知识。 根据优化算法的给定参数,复杂的模拟持续不同的时间,大约 1 到 5 分钟。如果参数选择得非常糟糕,模拟可能会持续 30 分钟或更长时间,并且结果没有用处。所以我正在考虑在多处理中建立一个超时,这会终止所有持续时间超过定义时间的模拟。这是问题的抽象版本:

import numpy as np
import time
import multiprocessing

def worker(num):
    
    time.sleep(np.random.random()*20)

def main():
    
    pnum = 10    
    
    procs = []
    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)
        
    for p in procs:
        p.join(5)
        print('stopping', p.name)
     
if __name__ == "__main__":
    main()

p.join(5) 行定义了 5 秒的超时。由于 for 循环 for p in procs: 程序等待 5 秒直到第一个进程完成,然后再等待 5 秒直到第二个进程完成等等,但我希望程序终止所有持续超过 5 秒的进程.此外,如果没有一个进程持续超过 5 秒,则程序不得等待这 5 秒。

【问题讨论】:

看看这里:***.com/q/1191374/2615940。它可能是重复的,但我不确定是否足以为您标记它。如果针对该答案的建议解决方案不适合您,请告诉我们原因。 这是一篇有趣的文章,但在我看来,它是连续启动进程而不是同时启动进程的解决方案。我的程序应该同时启动进程并杀死那些超过“全局”超时的进程。 【参考方案1】:

感谢dano的帮助,我找到了解决办法:

import numpy as np
import time
import multiprocessing

def worker(num):

    time.sleep(np.random.random()*20)

def main():

    pnum = 10    
    TIMEOUT = 5 
    procs = []
    bool_list = [True]*pnum

    for i in range(pnum):
        p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1)))
        procs.append(p)
        p.start()
        print('starting', p.name)

    start = time.time()
    while time.time() - start <= TIMEOUT:
        for i in range(pnum):
            bool_list[i] = procs[i].is_alive()
            
        print(bool_list)
            
        if np.any(bool_list):  
            time.sleep(.1)  
        else:
            break
    else:
        print("timed out, killing all processes")
        for p in procs:
            p.terminate()
            
    for p in procs:
        print('stopping', p.name,'=', p.is_alive())
        p.join()

if __name__ == "__main__":
    main()

这不是最优雅的方式,我相信有比使用bool_list 更好的方式。超时 5 秒后仍处于活动状态的进程将被杀死。如果您在工作函数中设置的时间比超时时间更短,您将看到程序在达到 5 秒的超时时间之前停止。如果有的话,我仍然愿意接受更优雅的解决方案:)

【讨论】:

【参考方案2】:

如果你想杀死所有进程,你可以使用多处理池 您需要为所有执行定义一个一般超时,而不是单独的超时。

import numpy as np
import time
from multiprocessing import Pool

def worker(num):
    xtime = np.random.random()*20
    time.sleep(xtime)
    return xtime

def main():

    pnum = 10
    pool = Pool()
    args = range(pnum)
    pool_result = pool.map_async(worker, args)

    # wait 5 minutes for every worker to finish
    pool_result.wait(timeout=300)

    # once the timeout has finished we can try to get the results
    if pool_result.ready():
        print(pool_result.get(timeout=1))

if __name__ == "__main__":
    main()

这将为您提供一个列表,其中包含所有工作人员的返回值。 更多信息在这里: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

【讨论】:

我不认为这实际上会终止池中的线程——它只是将执行返回给主线程,即使它们没有完成 我不明白我们为什么要做pool_result.get(timeout=1) 即:如果pool_result 已经准备好了,结果不应该也准备好了,并且不需要超时吗?【参考方案3】:

您可以通过创建一个循环来执行此操作,该循环将等待一些超时秒数,并经常检查所有进程是否已完成。如果它们没有在分配的时间内全部完成,则终止所有进程:

TIMEOUT = 5 
start = time.time()
while time.time() - start <= TIMEOUT:
    if not any(p.is_alive() for p in procs):
        # All the processes are done, break now.
        break

    time.sleep(.1)  # Just to avoid hogging the CPU
else:
    # We only enter this if we didn't 'break' above.
    print("timed out, killing all processes")
    for p in procs:
        p.terminate()
        p.join()

【讨论】:

谢谢,这似乎是一个合适的解决方案。不幸的是,如果进程在达到超时之前完成,则此代码不会中断。我通过将工作函数设置为time.sleep(1) 进行了尝试,1 秒后所有p.is_alive() 返回False。所以代码现在应该转到break 语句,但它仍在等待超时... 我发现了问题:print (p.is_alive() for p in procs) 返回&lt;generator object &lt;genexpr&gt; at 0x05712B20&gt; 但它应该是一个包含TrueFalse 元素的列表,以便any() 可以理解 @brp 使用any([p.is_alive() for p in procs])。这样它就变成了列表推导而不是生成器表达式。 @brp 哦,我刚刚注意到您使用的是np.any,而不是内置的any。这就是生成器表达式不起作用的原因。 np.any 仅适用于类数组对象。 内置 any 与列表理解是关键!谢谢!

以上是关于Python多处理模块:加入超时进程的主要内容,如果未能解决你的问题,请参考以下文章

Python多处理:超过超时后通过参数终止进程

使用多处理时加入进程是不是有顺序规则?

python 使用多处理模块通过多个进程执行功能的玩具示例。

为啥 Python 的多处理模块在 Windows 上启动新进程时会导入 __main__?

多处理在新控制台 python 中运行进程以获取每个进程的输入

Python 子进程、通信和多处理/多线程