多进程池中的 apply_async 问题

Posted

技术标签:

【中文标题】多进程池中的 apply_async 问题【英文标题】:A problem in apply_async in multiprocess pool 【发布时间】:2020-04-29 23:50:37 【问题描述】:

我在 Python 中使用多处理池及其.apply_async() 方法来同时运行多个工作线程。

但是由于使用with而不是任意创建实例存在问题。

这是我到目前为止所做的:


常用段码sn-p:

from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor
from time import sleep, time

def worker(x):
    print(f"x started.")
    sleep(x)
    print(f"x finished.")
    return f"x finished."

result_list = []
def log_result(result):
    result_list.append(result)

通过Python 2方式运行良好的第一个代码sn-p:

tick = time()

pool = Pool()
for i in range(6):
    pool.apply_async(worker, args=(i, ), callback=log_result)
pool.close()
pool.join()

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

1 started.
2 started.
0 started.
0 finished.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.022687673568726
['0 finished.', '1 finished.', '2 finished.', '3 finished.', '4 finished.', '5 finished.']
5

通过Python 3方式运行良好的第二个代码sn-p:

tick = time()

with ProcessPoolExecutor() as executor:
    for i in range(6):
        executor.submit(worker, i)

print('Total elapsed time: ', time() - tick)
print(i)  # Indicates that all iteration has been done.

输出:

0 started.
0 finished.
1 started.
2 started.
3 started.
4 started.
1 finished.
5 started.
2 finished.
3 finished.
4 finished.
5 finished.
Total elapsed time:  6.017550945281982
5

额外:

可以推断出 Python 3 方式比 Python 2 方式快。

问题:

现在问题出在这里,我想使用with 实现Python 2 方式,例如Python 3 方法,但是任务没有完成:

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

Total elapsed time:  0.10628008842468262
[]
5

但是,如果我在pool.apply_async(...) 之后放置了一个sleep(1),一些精简任务将完成(建立一个块):

tick = time()

with Pool() as pool:
    for i in range(6):
        pool.apply_async(worker, args=(i,), callback=log_result)
        sleep(1)

print('Total elapsed time: ', time() - tick)
print(result_list)
print(i)  # Indicates that all iteration has been done.

输出:

0 started.
0 finished.
1 started.
2 started.
1 finished.
3 started.
4 started.
2 finished.
5 started.
3 finished.
Total elapsed time:  6.022568702697754
['0 finished.', '1 finished.', '2 finished.', '3 finished.']
5

我错过了什么?

【问题讨论】:

【参考方案1】:

concurrent.futures.Executormultiprocessing.Pool 有两个完全不同的 context manager 实现。

concurrent.futures.Executor 调用shutdown(wait=True) 有效地等待所有排队的作业按照documentation 完成。

如果你使用 with 语句,你可以避免显式调用这个方法,这将关闭 Executor(等待就像 Executor.shutdown() 被调用且等待设置为 True)

multiprocessing.Pool 调用terminate 而不是close 然后join 这会导致所有正在进行的工作过早中断。在documentation.

池对象现在支持上下文管理协议 - 请参阅上下文管理器类型。 enter() 返回池对象,exit() 调用 terminate()。

如果你想将multiprocessing.Pool和它的上下文管理器一起使用,你需要自己等待结果。

with Pool() as pool:
    async_result = pool.apply_async(worker, args=(i,), callback=log_result)
    async_result.wait()

【讨论】:

对不起,我像上面一样写了新代码,但是没有用

以上是关于多进程池中的 apply_async 问题的主要内容,如果未能解决你的问题,请参考以下文章

python多进程之Pool

进程池

python基础学习日志day10-进程池

Python - 多进程池中的 make_archive zip 无法正常工作

如何为多处理池中的单个进程分配 python 请求会话?

40 进程池 回调函数 线程 thrrading模块