多进程池中的 apply_async 问题
Posted
技术标签:
【中文标题】多进程池中的 apply_async 问题【英文标题】:A problem in apply_async in multiprocess pool 【发布时间】:2020-04-29 23:50:37 【问题描述】:我在 Python 中使用多处理池及其.apply_async()
方法来同时运行多个工作线程。
但是由于使用with
而不是任意创建实例存在问题。
这是我到目前为止所做的:
常用段码sn-p:
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time
def worker(x):
print(f"x started.")
sleep(x)
print(f"x finished.")
return f"x finished."
result_list = []
def log_result(result):
result_list.append(result)
通过Python 2方式运行良好的第一个代码sn-p:
tick = time()
pool = Pool()
for i in range(6):
pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
输出:
1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5
通过Python 3方式运行良好的第二个代码sn-p:
tick = time()
with ProcessPoolExecutor() as executor:
for i in range(6):
executor.submit(worker, i)
print('Total elapsed time: ', time() - tick)
print(i) # Indicates that all iteration has been done.
输出:
0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time: 6.017550945281982
5
额外:
可以推断出 Python 3 方式比 Python 2 方式快。问题:
现在问题出在这里,我想使用with
实现Python 2 方式,例如Python 3 方法,但是任务没有完成:
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
输出:
Total elapsed time: 0.10628008842468262
[]
5
但是,如果我在pool.apply_async(...)
之后放置了一个sleep(1)
,一些精简任务将完成(建立一个块):
tick = time()
with Pool() as pool:
for i in range(6):
pool.apply_async(worker, args=(i,), callback=log_result)
sleep(1)
print('Total elapsed time: ', time() - tick)
print(result_list)
print(i) # Indicates that all iteration has been done.
输出:
0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time: 6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5
我错过了什么?
【问题讨论】:
【参考方案1】:concurrent.futures.Executor
和 multiprocessing.Pool
有两个完全不同的 context manager 实现。
concurrent.futures.Executor
调用shutdown(wait=True)
有效地等待所有排队的作业按照documentation 完成。
如果你使用 with 语句,你可以避免显式调用这个方法,这将关闭 Executor(等待就像 Executor.shutdown() 被调用且等待设置为 True)
multiprocessing.Pool
调用terminate
而不是close
然后join
这会导致所有正在进行的工作过早中断。在documentation.
池对象现在支持上下文管理协议 - 请参阅上下文管理器类型。 enter() 返回池对象,exit() 调用 terminate()。
如果你想将multiprocessing.Pool
和它的上下文管理器一起使用,你需要自己等待结果。
with Pool() as pool:
async_result = pool.apply_async(worker, args=(i,), callback=log_result)
async_result.wait()
【讨论】:
对不起,我像上面一样写了新代码,但是没有用以上是关于多进程池中的 apply_async 问题的主要内容,如果未能解决你的问题,请参考以下文章