如果子进程导致分段错误,multiprocessing.Pool 将挂起
Posted
技术标签:
【中文标题】如果子进程导致分段错误,multiprocessing.Pool 将挂起【英文标题】:multiprocessing.Pool hangs if child causes a segmentation fault 【发布时间】:2014-08-13 18:55:51 【问题描述】:我想使用 multiprocessing.Pool 并行应用一个函数。 问题是,如果一个函数调用触发了分段错误,那么 Pool 将永远挂起。 有谁知道我如何制作一个池来检测何时发生此类情况并引发错误?
下面的例子展示了如何重现它(需要 scikit-learn > 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
【问题讨论】:
修复分段错误?通常segfaults是由无效的内存访问引起的,这是undefined行为,根本不保证会导致segfault。 没有答案,但我可以说 joblib.Parallel 似乎永远挂起。据我所知,没有办法返回段错误或在多处理中添加“看门狗”超时。 其实,也许你可以添加一个超时装饰器?如这里所示:code.activestate.com/recipes/577028 查看这个答案:***.com/a/24396655/2073595。有点乱,但是你可以监控池中的各个进程,看看有没有意外重启。 这里还需要注意:concurrent.futures.ProcessPoolExecutor
确实会检测到进程何时被意外终止,并在发生任何未完成的任务时引发BrokenProcessPool
异常。还有针对multiprocessing
的a bug 提交了一个工作补丁,以将相同的行为添加到multiprocessing.Pool
。
【参考方案1】:
如 cmets 中所述,如果您使用 concurrent.Futures.ProcessPoolExecutor
而不是 multiprocessing.Pool
,这仅适用于 Python 3。
如果您坚持使用 Python 2,我发现的最佳选择是对 Pool.apply_async
和 Pool.map_async
返回的结果对象使用 timeout
参数。例如:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
只要您对子进程完成任务所需的时间有一个上限,此方法就可以工作。
【讨论】:
【参考方案2】:这是known bug, issue #22393, in Python。只要您使用multiprocessing.pool
,就没有有意义的解决方法,直到它被修复。该链接提供了一个补丁,但它尚未集成到主版本中,因此没有稳定的 Python 版本可以解决此问题。
【讨论】:
【参考方案3】:您可能宁愿使用Process()
自己手动创建子进程,而不是使用Pool().imap()
。我敢打赌,返回的对象将允许您获得任何孩子的活动状态。你会知道他们是否挂断。
【讨论】:
【参考方案4】:我还没有运行您的示例以查看它是否可以处理错误,但请尝试并发期货。只需将 my_function(i) 替换为您的 fit_one(i)。保留__name__=='__main__':
结构。并发期货似乎需要这个。下面的代码是在我的机器上测试过的,所以希望能直接在你的机器上运行。
import concurrent.futures
def my_function(i):
print('function running')
return i
def run():
number_processes=4
executor = concurrent.futures.ProcessPoolExecutor(number_processes)
futures = [executor.submit(my_function,i) for i in range(10)]
concurrent.futures.wait(futures)
for f in futures:
print(f.result())
if __name__ == '__main__':
run()
【讨论】:
我只是认为它可能会起作用,因为您可以在完成所有流程后返回的可迭代“期货”上调用各种方法。所以它可能能够从容应对错误。以上是关于如果子进程导致分段错误,multiprocessing.Pool 将挂起的主要内容,如果未能解决你的问题,请参考以下文章