如何在多处理器系统上生成并行子进程?
Posted
技术标签:
【中文标题】如何在多处理器系统上生成并行子进程?【英文标题】:How to spawn parallel child processes on a multi-processor system? 【发布时间】:2010-10-27 10:42:03 【问题描述】:我有一个 Python 脚本,我想将它用作另一个 Python 脚本的控制器。我有一个有 64 个处理器的服务器,所以想为第二个 Python 脚本生成多达 64 个子进程。子脚本被调用:
$ python create_graphs.py --name=NAME
其中 NAME 类似于 XYZ、ABC、NYU 等。
在我的父控制器脚本中,我从列表中检索名称变量:
my_list = [ 'XYZ', 'ABC', 'NYU' ]
所以我的问题是,将这些进程作为子进程生成的最佳方法是什么?我想一次将子进程的数量限制为 64,因此需要跟踪状态(如果子进程已完成或未完成),以便我可以有效地保持整代运行。
我研究过使用 subprocess 包,但拒绝了它,因为它一次只产生一个孩子。我终于找到了多处理器包,但我承认对整个线程与子进程的文档感到不知所措。
现在,我的脚本使用 subprocess.call
一次只生成一个孩子,如下所示:
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
我真的希望它一次产生 64 个孩子。在其他 *** 问题中,我看到人们使用 Queue,但这似乎会影响性能?
【问题讨论】:
使用 enumerate 代替 range+len 组合 【参考方案1】:您正在寻找的是多处理中的process pool 类。
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
为了便于理解,这里有一个计算示例。以下将在 N 个进程上划分 10000 个任务,其中 N 是 cpu 计数。请注意,我将 None 作为进程数传递。这将导致 Pool 类使用 cpu_count 作为进程数 (reference)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
【讨论】:
如果池中的每个进程都从一个公共队列中拉出一个“请求”,那么它们将保持最大的忙碌状态,直到队列为空。 感谢您的帮助。当我尝试运行您的示例代码时,我收到以下错误:ImportError:此平台缺少正常工作的 sem_open 实现,因此,所需的同步原语将不起作用,请参阅问题 3770。我的服务器是 Sun Sparc(T5220,SunOS 5.10)。我需要如何更改它才能在我的系统上运行? @tatlar,这是一个已知问题bugs.python.org/issue3770 不幸的是我不知道如何解决这个问题。 好的,谢谢。考虑到这一限制,因此最好采用 Jim Robert 提出的解决方案,即。使用下面显示的 threading.Thread()? shell=False 不需要,因为它是默认值。见docs.python.org/2/library/subprocess.html【参考方案2】:这是我根据 Nadia 和 Jim 的 cmets 提出的解决方案。我不确定这是否是最好的方法,但它确实有效。被调用的原始子脚本需要是一个 shell 脚本,因为我需要使用一些 3rd 方应用程序,包括 Matlab。所以我不得不把它从 Python 中拿出来,用 bash 编码。
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
这看起来是一个合理的解决方案吗?我尝试使用 Jim 的 while 循环格式,但我的脚本什么也没返回。我不确定为什么会这样。这是我用 Jim 的“while”循环替换“for”循环运行脚本时的输出:
hostnameme2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostnameme3%
当我使用“for”循环运行它时,我会得到更有意义的东西:
hostnameme6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostnameme7%
所以这行得通,我很高兴。但是,我仍然不明白为什么我不能使用 Jim 的 'while' 样式循环而不是我正在使用的 'for' 循环。感谢所有帮助 - 我对 @*** 的知识广度印象深刻。
【讨论】:
您需要使用 pop 方法从 my_list 中获取值,该方法返回值并将其从列表中删除。如果你在完成后没有从列表中删除它们,while 循环将永远不会结束。 for 循环也存在问题,因为如果您要处理的数据多于核心,则会达到限制并忽略其余数据(它不会全部处理) oops - 好的,添加了 pop() 方法。但是,我仍然看到我在上面发布的相同输出...... 如果我在 while 循环中添加一个打印语句(以确保我实际上在循环内),我也看不到该输出。就像循环永远不会进入? 我重新审视了这个只是想弄清楚为什么我从来没有让它工作。我想到了。 While 语句仅在所有条件评估为 True 时才会执行。 Jim 的 while 循环永远不会计算为 True,因为列表“线程”将在脚本开始时计算为 False,因为它是空的。这就是为什么我永远无法让 Jim 的示例起作用的原因——我永远无法成功进入 while 循环。【参考方案3】:我肯定会使用multiprocessing,而不是使用子进程滚动我自己的解决方案。
【讨论】:
【参考方案4】:除非您打算从应用程序中获取数据,否则我认为您不需要队列(如果您确实需要数据,我认为将其添加到数据库可能更容易)
但是试穿一下尺寸:
将您的 create_graphs.py 脚本的内容全部放入一个名为“create_graphs”的函数中
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
我知道这将导致比处理器少 1 个线程,这可能很好,它让处理器来管理线程、磁盘 i/o 以及计算机上发生的其他事情。如果您决定要使用最后一个核心,只需添加一个即可
edit:我想我可能误解了 my_list 的用途。您根本不需要my_list
来跟踪线程(因为它们都被threads
列表中的项目引用)。但这是提供进程输入的好方法 - 甚至更好:使用生成器函数;)
my_list
和threads
的用途
my_list
保存您需要在函数中处理的数据threads
只是当前正在运行的线程的列表
while 循环做了两件事,启动新线程来处理数据,并检查是否有线程完成运行。
因此,只要您有 (a) 更多数据要处理,或者 (b) 线程未完成运行......您想要编程以继续运行。 一旦两个列表都为空,它们将评估为 False
并且 while 循环将退出
【讨论】:
嗨,吉姆。我以前没有见过这种 while 语法。在 while 子句中有两个列表 - 线程和 my_list。这背后的逻辑是什么?我在任何地方都看到过 Python while 循环,它们有明确的小于或其他方式来控制循环。这似乎在这里缺失 - 或者我只是在问一个愚蠢的新手问题?谢谢。 嗨,吉姆 - 感谢您的澄清。在我的代码中,当我像这样设置 which 循环时,只会使我的脚本不返回任何内容(即 which 循环永远不会开始)。如果我将 while 更改为 for 循环,它可以工作:之前:while 线程和 my_list 现在:for list_item in my_list 我想知道我做错了什么。我将在下面发布完整的工作脚本-也许您可以看一下?非常感谢所有的帮助。 当然...在您添加脚本后给我留下一条评论(这样它就会显示在我最近的项目列表中) 添加了脚本。谢谢吉姆! 我重新审视了这个只是想弄清楚为什么我从来没有让它工作。我想到了。 While 语句仅在 ALL 条件评估为 True 时继续。 Jim 的 while 循环永远不会计算为 True,因为列表“线程”将在脚本开始时计算为 False,因为它是空的。这就是为什么我永远无法让 Jim 的示例起作用的原因——我永远无法成功进入 while 循环。以上是关于如何在多处理器系统上生成并行子进程?的主要内容,如果未能解决你的问题,请参考以下文章