如何在继续之前等待所有 multiprocessing.Processes 完成?

Posted

技术标签:

【中文标题】如何在继续之前等待所有 multiprocessing.Processes 完成?【英文标题】:How to wait for all multiprocessing.Processes to complete before continuing? 【发布时间】:2021-01-21 12:57:54 【问题描述】:

我正在学习 Python multiprocessing 并试图了解如何让我的代码等待所有进程完成,然后继续执行其余代码。我认为join() 方法应该可以完成这项工作,但是我的代码输出不是我使用它所期望的。

代码如下:

from multiprocessing import Process
import time 

def fun():

    print('starting fun')
    time.sleep(2)
    print('finishing fun')

def fun2():

    print('starting fun2')
    time.sleep(5)
    print('finishing fun2')

def fun3():

    print('starting fun3')
    print('finishing fun3')   

if __name__ == '__main__':
    processes = []
    print('starting main')
    for i in [fun, fun2, fun3]:
        p = Process(target=i)
        p.start()
        processes.append(p)
    for p in processes:
        p.join()  
    print('finishing main')

g=0
print("g",g)

我希望 if __name__ == '__main__': 下的所有进程在调用 g=0print(g) 行之前完成,所以应该会出现这样的情况:

starting main
starting fun2
starting fun
starting fun3
finishing fun3
finishing fun
finishing fun2
finishing main
g 0

但实际输出表明我对join()(或一般multiprocessing)有些不理解:

starting main
g 0
g 0
starting fun2
g 0
starting fun
starting fun3
finishing fun3
finishing fun
finishing fun2
finishing main
g 0

问题是:如何编写先完成所有进程的代码,然后在没有多处理的情况下继续执行代码,以便获得前一个输出?我在 Windows 上从命令提示符运行代码,以防万一。

【问题讨论】:

不同但相关的问题:***.com/questions/64126594/… .join() 没有问题,但缩进有问题。见类似案例here。 【参考方案1】:

等待Process 完成:

你可以Process.join你的列表,比如

import multiprocessing
import time

def func1():
    time.sleep(1)
    print('func1')

def func2():
    time.sleep(2)
    print('func2')

def func3():
    time.sleep(3)
    print('func3')

def main():
    processes = [
        multiprocessing.Process(target=func1),
        multiprocessing.Process(target=func2),
        multiprocessing.Process(target=func3),
    ]
    for p in processes:
        p.start()

    for p in processes:
        p.join()

if __name__ == '__main__':
    main()

但如果您正在考虑让您的流程更加复杂,请尝试使用Pool

import multiprocessing
import time

def func1():
    time.sleep(1)
    print('func1')

def func2():
    time.sleep(2)
    print('func2')

def func3():
    time.sleep(3)
    print('func3')

def main():
    result = []
    with multiprocessing.Pool() as pool:
        result.append(pool.apply_async(func1))
        result.append(pool.apply_async(func2))
        result.append(pool.apply_async(func3))

        for r in result:
            r.wait()

if __name__ == '__main__':
    main()

More info on Pool

为什么 g0 会打印多次:

发生这种情况是因为您使用spawnforkserver 来设置Process,而g0print 声明位于函数或__main__ if 块之外。

来自the docs:

确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用(例如启动新进程)。

(...)

这允许新生成的 Python 解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建池或管理器,则适用类似的限制。

它基本上是再次解释,因为它将您的 .py 文件作为模块导入。

【讨论】:

使用像[p.start() for p in processes]这样的列表解析,它只是在创建列表后丢弃列表,只是为了多次执行其他操作而不是使用常规的for-循环既低效又普遍许多人(包括我自己)认为这是一种糟糕的 Python 编程实践。 您说得对,感谢您的反馈。我已经编辑了我的回复以反映这一点。 绝对是一种改进,IMO。 如果你打电话等待,你可能不想要async,可以关注this example

以上是关于如何在继续之前等待所有 multiprocessing.Processes 完成?的主要内容,如果未能解决你的问题,请参考以下文章

如何在继续之前等待 requestLocation?

在继续之前等待图像加载

如何使用事件发射器调用返回值的函数,并在继续之前等待响应?

如何让一个方法在继续执行之前等待另一个(异步)方法完成?

节点在继续之前等待异步功能

Service Fabric Start-ServiceFabricApplicationUpgrade - 如何让 powershell 在继续之前等待升级成功