始终并行运行恒定数量的子进程
Posted
技术标签:
【中文标题】始终并行运行恒定数量的子进程【英文标题】:Always run a constant number of subprocesses in parallel 【发布时间】:2013-08-10 00:09:25 【问题描述】:我想使用子进程让 20 个编写脚本的实例并行运行。假设我有一个包含 100.000 个条目的大 URL 列表,我的程序应该控制我的脚本的 20 个实例一直在该列表上工作。我想将其编码如下:
urllist = [url1, url2, url3, .. , url100000]
i=0
while number_of_subproccesses < 20 and i<100000:
subprocess.Popen(['python', 'script.py', urllist[i]]
i = i+1
我的脚本只是将一些内容写入数据库或文本文件。它不输出任何内容,也不需要比 url 更多的输入。
我的问题是我无法找到如何获取活动子进程的数量。我是新手程序员,所以欢迎每一个提示和建议。我还想知道一旦加载了 20 个子进程以使 while 循环再次检查条件,我该如何管理它?我想可能会在它上面再放一个while循环,比如
while i<100000
while number_of_subproccesses < 20:
subprocess.Popen(['python', 'script.py', urllist[i]]
i = i+1
if number_of_subprocesses == 20:
sleep() # wait to some time until check again
或者,while 循环总是检查子进程的数量的可能性更大?
我也考虑过使用模块多处理,但我发现只调用带有子处理的 script.py 而不是带有多处理的函数真的很方便。
也许有人可以帮助我并引导我走向正确的方向。非常感谢!
【问题讨论】:
相关:Limiting number of processes in multiprocessing python 【参考方案1】:为了保持恒定数量的并发请求,您可以使用线程池:
#!/usr/bin/env python
from multiprocessing.dummy import Pool
def process_url(url):
# ... handle a single url
urllist = [url1, url2, url3, .. , url100000]
for _ in Pool(20).imap_unordered(process_url, urllist):
pass
要运行进程而不是线程,请从导入中删除 .dummy
。
【讨论】:
【参考方案2】:采取与上述不同的方法-似乎无法将回调作为参数发送:
NextURLNo = 0
MaxProcesses = 20
MaxUrls = 100000 # Note this would be better to be len(urllist)
Processes = []
def StartNew():
""" Start a new subprocess if there is work to do """
global NextURLNo
global Processes
if NextURLNo < MaxUrls:
proc = subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
print ("Started to Process %s", urllist[NextURLNo])
NextURLNo += 1
Processes.append(proc)
def CheckRunning():
""" Check any running processes and start new ones if there are spare slots."""
global Processes
global NextURLNo
for p in range(len(Processes):0:-1): # Check the processes in reverse order
if Processes[p].poll() is not None: # If the process hasn't finished will return None
del Processes[p] # Remove from list - this is why we needed reverse order
while (len(Processes) < MaxProcesses) and (NextURLNo < MaxUrls): # More to do and some spare slots
StartNew()
if __name__ == "__main__":
CheckRunning() # This will start the max processes running
while (len(Processes) > 0): # Some thing still going on.
time.sleep(0.1) # You may wish to change the time for this
CheckRunning()
print ("Done!")
【讨论】:
再次感谢您的回答!一些小问题。 len(Processes):0:-1 做了什么以及为什么它不足以在 Processes: 中为 p 编写?我猜在主程序中,它应该是:while (len(Processes) > 0): ?再次感谢您的努力,这是一个非常有创意的答案![len(Processes):0:-1]
将以相反的顺序遍历进程列表,以避免在遍历列表时从列表中删除而出现问题。我会更正另一个。
Processes[p].poll() 仍然存在一个小问题,它说“列表索引必须是整数,而不是 popen”。那么是否可以在 Processes 列表中获取我们的 p 的索引?也许像 Processes.index(p) 之类的?
如果我用 == 0 扩展 if Processes[p].poll(),一切正常!再次非常感谢!
这太棒了!我还必须将 processes[p].poll() 更改为 processes[p].poll() == 0 并且对于反向范围,我只使用了 reversed(range(len(processes)))。谢谢!【参考方案3】:
在启动它们时保持计数,如果有任何要处理的 url 列表条目,则使用每个子进程的回调来启动一个新的。
例如假设您的子进程在结束时调用传递给它的 OnExit 方法:
NextURLNo = 0
MaxProcesses = 20
NoSubProcess = 0
MaxUrls = 100000
def StartNew():
""" Start a new subprocess if there is work to do """
global NextURLNo
global NoSubProcess
if NextURLNo < MaxUrls:
subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
print "Started to Process", urllist[NextURLNo]
NextURLNo += 1
NoSubProcess += 1
def OnExit():
NoSubProcess -= 1
if __name__ == "__main__":
for n in range(MaxProcesses):
StartNew()
while (NoSubProcess > 0):
time.sleep(1)
if (NextURLNo < MaxUrls):
for n in range(NoSubProcess,MaxProcesses):
StartNew()
【讨论】:
哇,非常感谢您为我的问题提供了完整的解决方案。这对我有很大帮助!回调的想法非常好用。感谢您的努力! 再问一个小问题:我尝试了你的代码,但不知何故它给了我一个 Typerror:在 subprocess.Popen 行中,'function' 类型的参数是不可迭代的。我知道这与调用 OnExit 函数有关,但这是什么意思,我该如何防止这种情况发生?也许我在 Windows7 上使用 Python3.3 获取信息。 哦,我想我明白了,我必须实现在我的 script.py 中运行传递的 OnExit。对吗? 似乎无法在参数列表中传递函数,还是我错了?有什么解决方法吗?以上是关于始终并行运行恒定数量的子进程的主要内容,如果未能解决你的问题,请参考以下文章