始终并行运行恒定数量的子进程

Posted

技术标签:

【中文标题】始终并行运行恒定数量的子进程【英文标题】:Always run a constant number of subprocesses in parallel 【发布时间】:2013-08-10 00:09:25 【问题描述】:

我想使用子进程让 20 个编写脚本的实例并行运行。假设我有一个包含 100.000 个条目的大 URL 列表,我的程序应该控制我的脚本的 20 个实例一直在该列表上工作。我想将其编码如下:

urllist = [url1, url2, url3, .. , url100000]
i=0
while number_of_subproccesses < 20 and i<100000:
    subprocess.Popen(['python', 'script.py', urllist[i]]
    i = i+1

我的脚本只是将一些内容写入数据库或文本文件。它不输出任何内容,也不需要比 url 更多的输入。

我的问题是我无法找到如何获取活动子进程的数量。我是新手程序员,所以欢迎每一个提示和建议。我还想知道一旦加载了 20 个子进程以使 while 循环再次检查条件,我该如何管理它?我想可能会在它上面再放一个while循环,比如

while i<100000
   while number_of_subproccesses < 20:
       subprocess.Popen(['python', 'script.py', urllist[i]]
       i = i+1
       if number_of_subprocesses == 20:
           sleep() # wait to some time until check again

或者,while 循环总是检查子进程的数量的可能性更大?

我也考虑过使用模块多处理,但我发现只调用带有子处理的 script.py 而不是带有多处理的函数真的很方便。

也许有人可以帮助我并引导我走向正确的方向。非常感谢!

【问题讨论】:

相关:Limiting number of processes in multiprocessing python 【参考方案1】:

为了保持恒定数量的并发请求,您可以使用线程池:

#!/usr/bin/env python
from multiprocessing.dummy import Pool

def process_url(url):
    # ... handle a single url

urllist = [url1, url2, url3, .. , url100000]
for _ in Pool(20).imap_unordered(process_url, urllist):
    pass

要运行进程而不是线程,请从导入中删除 .dummy

【讨论】:

【参考方案2】:

采取与上述不同的方法-似乎无法将回调作为参数发送:

NextURLNo = 0
MaxProcesses = 20
MaxUrls = 100000  # Note this would be better to be len(urllist)
Processes = []

def StartNew():
   """ Start a new subprocess if there is work to do """
   global NextURLNo
   global Processes

   if NextURLNo < MaxUrls:
      proc = subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
      print ("Started to Process %s", urllist[NextURLNo])
      NextURLNo += 1
      Processes.append(proc)

def CheckRunning():
   """ Check any running processes and start new ones if there are spare slots."""
   global Processes
   global NextURLNo

   for p in range(len(Processes):0:-1): # Check the processes in reverse order
      if Processes[p].poll() is not None: # If the process hasn't finished will return None
         del Processes[p] # Remove from list - this is why we needed reverse order

   while (len(Processes) < MaxProcesses) and (NextURLNo < MaxUrls): # More to do and some spare slots
      StartNew()

if __name__ == "__main__":
   CheckRunning() # This will start the max processes running
   while (len(Processes) > 0): # Some thing still going on.
      time.sleep(0.1) # You may wish to change the time for this
      CheckRunning()

   print ("Done!")

【讨论】:

再次感谢您的回答!一些小问题。 len(Processes):0:-1 做了什么以及为什么它不足以在 Processes: 中为 p 编写?我猜在主程序中,它应该是:while (len(Processes) > 0): ?再次感谢您的努力,这是一个非常有创意的答案! [len(Processes):0:-1] 将以相反的顺序遍历进程列表,以避免在遍历列表时从列表中删除而出现问题。我会更正另一个。 Processes[p].poll() 仍然存在一个小问题,它说“列表索引必须是整数,而不是 popen”。那么是否可以在 Processes 列表中获取我们的 p 的索引?也许像 Processes.index(p) 之类的? 如果我用 == 0 扩展 if Processes[p].poll(),一切正常!再次非常感谢! 这太棒了!我还必须将 processes[p].poll() 更改为 processes[p].poll() == 0 并且对于反向范围,我只使用了 reversed(range(len(processes)))。谢谢!【参考方案3】:

在启动它们时保持计数,如果有任何要处理的 url 列表条目,则使用每个子进程的回调来启动一个新的。

例如假设您的子进程在结束时调用传递给它的 OnExit 方法:

NextURLNo = 0
MaxProcesses = 20
NoSubProcess = 0
MaxUrls = 100000

def StartNew():
   """ Start a new subprocess if there is work to do """
   global NextURLNo
   global NoSubProcess

   if NextURLNo < MaxUrls:
      subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit])
      print "Started to Process", urllist[NextURLNo]
      NextURLNo += 1
      NoSubProcess += 1

def OnExit():
   NoSubProcess -= 1

if __name__ == "__main__":
   for n in range(MaxProcesses):
      StartNew()
   while (NoSubProcess > 0):
      time.sleep(1)
      if (NextURLNo < MaxUrls):
         for n in range(NoSubProcess,MaxProcesses):
             StartNew()

【讨论】:

哇,非常感谢您为我的问题提供了完整的解决方案。这对我有很大帮助!回调的想法非常好用。感谢您的努力! 再问一个小问题:我尝试了你的代码,但不知何故它给了我一个 Typerror:在 subprocess.Popen 行中,'function' 类型的参数是不可迭代的。我知道这与调用 OnExit 函数有关,但这是什么意思,我该如何防止这种情况发生?也许我在 Windows7 上使用 Python3.3 获取信息。 哦,我想我明白了,我必须实现在我的 script.py 中运行传递的 OnExit。对吗? 似乎无法在参数列表中传递函数,还是我错了?有什么解决方法吗?

以上是关于始终并行运行恒定数量的子进程的主要内容,如果未能解决你的问题,请参考以下文章

Shell脚本入门 07:进程与信号

Shell脚本入门 07:进程与信号

创建一个不是创建进程子进程的新进程

golang 热重启

Linux学习-进程管理

如何找到理想数量的并行进程以使用 python 多处理运行?