python multiprocessing pool.map() 等到方法完成
Posted
技术标签:
【中文标题】python multiprocessing pool.map() 等到方法完成【英文标题】:python multiprocessing pool.map() wait until method is done 【发布时间】:2021-11-02 19:47:19 【问题描述】:我是 python 多处理的新手,并在认真使用之前进行了一些实验。现在我正在使用 pool.map() 这似乎工作得很好。但我想在继续我的程序的其余部分之前完成池作业:
import time
import multiprocessing as mp
res1 = []
def my_func(x):
print(mp.current_process())
res1.append(x**x)
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
toc = time.time()
t = toc-tic
print(t)
if __name__ == "__main__":
main()
print('Hi')
现在“hi”打印在我的进程之间,但我希望先完成计算过程,最后它应该打印“hi”。
我尝试了 pool.close() 和 pool.join() 但在 my_func() 内部它似乎什么都没有改变,而在外部,比如在 main() 中,它不知道我的池对象(当然,因为它是在 main()) 中声明
我知道这只是一个测试程序,但我需要将这个概念用于我论文中的一个更大的项目。所以我很感激我能得到的每一个帮助。提前非常感谢!
【问题讨论】:
【参考方案1】:一些事情:
首先,您传递的x**x
和x
的值是一个非常 大的数字,需要相当长的时间来计算。在我的带有 8 个逻辑处理器(cpu_count()
返回 8)的桌面上,map
函数需要 99 秒才能完成——但它确实完成了。
其次,每个进程附加到的全局变量res1
是每个进程唯一的。也就是说,每个进程都在自己的地址空间中运行(这是多处理的属性),因此有自己的res1
副本,这就是为什么主进程的res1
在调用返回时为空的原因map
。相反,您的工作函数,即my_func
,应该返回其结果,然后map
的返回值将是所有返回值的列表。
第三,这是在 Windows 下如何创建进程的属性,不在if __name__ = "__main__":
块内的全局范围内的任何语句都将由每个新创建的进程在其初始化时执行,这就是为什么你会看到@ 987654333@ 立即打印cpu_count()
次。您应该在全局范围内删除对print('Hi')
的调用并将其放在my_func
中(参见最后一个代码示例)。
因为生命短暂,我修改了my_func
,只是为了返回其参数的平方:
import time
import multiprocessing as mp
def my_func(x):
print(mp.current_process())
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222])
print('Hi')
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
打印:
8
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3232 started daemon>
<SpawnProcess name='SpawnPoolWorker-1' parent=3232 started daemon>
Hi
0.17300081253051758
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
您会注意到,尽管池大小为 8,但池中的单个进程处理了 7 个结果。其原因如下。将要处理的 8 个“任务”,由传递给 map
方法的 iterable 中的 8 个数字表示,以一定大小的“块”放置在要处理的任务输入队列中.如果不指定 chunksize 参数,则默认为None
,map
方法将根据可迭代的大小和池大小计算合适的值。在这种情况下,将使用 1 的 chunksize 值。空闲进程将从输入队列中获取下一个任务块并执行该块。在这种情况下,池中的一个进程抓取了一个块并以如此快的速度执行它,以至于它能够返回并抓取下一个块并在池中的任何其他进程被调度到另一个处理器之前执行它。事实上,它还能再做 6 次。
通过调用time.sleep
并确保使用 chunksize 值 1 使 my_func
的乐趣更长一点,我们确保我们给每个处理器一个机会一个任务:
import time
import multiprocessing as mp
import time
def my_func(x):
time.sleep(.1)
print(mp.current_process())
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=1)
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
print('Hi')
打印:
8
Hi
Hi
Hi
Hi
Hi
Hi
Hi
Hi
<SpawnProcess name='SpawnPoolWorker-1' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-4' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-3' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-2' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-6' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-7' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-5' parent=3116 started daemon>
<SpawnProcess name='SpawnPoolWorker-8' parent=3116 started daemon>
0.28499770164489746
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
Hi
同样,通过指定 chunksize=8,我们保证所有 8 个任务都将由一个进程处理,而其他 7 个处于空闲状态:
import time
import multiprocessing as mp
import time
def my_func(x):
time.sleep(.1)
print(mp.current_process())
print('Hi')
return x * x
def main():
print(mp.cpu_count())
pool = mp.Pool(mp.cpu_count())
tic = time.time()
results = pool.map(my_func, [4444444,2222222,3333333,5555555,3333333,2222222,1111111,2222222], chunksize=8)
toc = time.time()
t = toc-tic
print(t)
print(results)
if __name__ == "__main__":
main()
打印:
8
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
<SpawnProcess name='SpawnPoolWorker-4' parent=22284 started daemon>
Hi
0.9750022888183594
[19753082469136, 4938270617284, 11111108888889, 30864191358025, 11111108888889, 4938270617284, 1234567654321, 4938270617284]
不用说,这是一个糟糕的 chunksize 值,可用于这个 iterable/pool 大小组合。
【讨论】:
首先:非常感谢您为这个答案付出的所有努力。这对我有很大的帮助!但是,我还有一个问题,也许我的问题不够清楚:我希望在所有 8 个进程完成后 打印“hi”。 “hi”实际上只表示在方法“main()”完成后应该运行的其他代码。所以,我想将所有计算的东西加入一个列表,然后继续使用这个计算值列表。 我已经更新了第一个代码示例。当对pool.map
的调用返回 8 个结果列表(map
blocks 直到传递的 iterable 的所有元素都被池和结果已返回)。所以我只是将调用后的打印语句移动到map
。
好的,我明白了。并且说我在池之后还有一些函数要执行,这些函数使用变量“结果”。我可以简单地将所有这些放在“main()”函数中的“hi”下面,对吧?非常感谢!
对!但是请参阅What should I do when someone answers my question?,特别是关于接受答案而不是表示感谢的部分。
别担心。我只是想让它保持打开状态,直到我真正清楚它是如何工作的。以上是关于python multiprocessing pool.map() 等到方法完成的主要内容,如果未能解决你的问题,请参考以下文章