multiprocessing.Pool:如何在旧进程完成时启动新进程?
Posted
技术标签:
【中文标题】multiprocessing.Pool:如何在旧进程完成时启动新进程?【英文标题】:multiprocessing.Pool: How to start new processes as old ones finish? 【发布时间】:2019-03-08 02:02:52 【问题描述】:我正在使用多处理池来管理 tesseract 进程(缩微胶片的 OCRing 页面)。很多时候,在一个包含 20 个 tesseract 进程的池中,有几页会更难进行 OCR,因此这些进程比其他进程花费的时间要长得多。同时,池只是挂起,大多数 CPU 都没有被利用。我希望让这些落后者继续下去,但我也想启动更多进程来填满现在闲置的许多其他 CPU,而这几个粘性页面正在完成。我的问题:有没有办法加载新进程来利用那些空闲的 CPU。换句话说,在等待整个池完成之前,可以将池中的空位填满吗?
我可以使用星图的异步版本,然后在当前池已下降到一定数量的活动进程时加载一个新池。但这似乎不优雅。根据需要自动保持在进程中的插槽会更优雅。
这是我的代码现在的样子:
def getMpBatchMap(fileList, commandTemplate, concurrentProcesses):
mpBatchMap = []
for i in range(concurrentProcesses):
fileName = fileList.readline()
if fileName:
mpBatchMap.append((fileName, commandTemplate))
return mpBatchMap
def executeSystemProcesses(objFileName, commandTemplate):
objFileName = objFileName.strip()
logging.debug(objFileName)
objDirName = os.path.dirname(objFileName)
command = commandTemplate.substitute(objFileName=objFileName, objDirName=objDirName)
logging.debug(command)
subprocess.call(command, shell=True)
def process(FILE_LIST_FILENAME, commandTemplateString, concurrentProcesses=3):
"""Go through the list of files and run the provided command against them,
one at a time. Template string maps the terms $objFileName and $objDirName.
Example:
>>> runBatchProcess('convert -scale 256 "$objFileName" "$objDirName/TN.jpg"')
"""
commandTemplate = Template(commandTemplateString)
with open(FILE_LIST_FILENAME) as fileList:
while 1:
# Get a batch of x files to process
mpBatchMap = getMpBatchMap(fileList, commandTemplate, concurrentProcesses)
# Process them
logging.debug('Starting MP batch of %i' % len(mpBatchMap))
if mpBatchMap:
with Pool(concurrentProcesses) as p:
poolResult = p.starmap(executeSystemProcesses, mpBatchMap)
logging.debug('Pool result: %s' % str(poolResult))
else:
break
【问题讨论】:
我不明白。如果进程正在执行 OCR,为什么 CPU 空闲? OCR /应该/受 CPU 限制。而且 AFAIK 不存在您希望的事情。相反,只需增加池大小以便加载系统。 【参考方案1】:你在这里混合了一些东西。池始终保持许多指定的进程处于活动状态。只要您不手动或通过离开上下文管理器的 with 块来关闭池,您就不需要用进程重新填充池,因为它们不会去任何地方。
您可能想说的是“任务”,即这些流程可以处理的任务。任务是您传递给池方法的迭代的每个进程块。是的,有一种方法可以将池中的空闲进程用于新任务在之前所有先前排队的任务都已处理。您已经为此选择了正确的工具,即池方法的异步版本。您所要做的就是重新应用某种异步池方法。
from multiprocessing import Pool
import os
def busy_foo(x):
x = int(x)
for _ in range(x):
x - 1
print(os.getpid(), ' returning: ', x)
return x
if __name__ == '__main__':
arguments1 = zip([222e6, 22e6] * 2)
arguments2 = zip([111e6, 11e6] * 2)
with Pool(4) as pool:
results = pool.starmap_async(busy_foo, arguments1)
results2 = pool.starmap_async(busy_foo, arguments2)
print(results.get())
print(results2.get())
示例输出:
3182 returning: 22000000
3185 returning: 22000000
3185 returning: 11000000
3182 returning: 111000000
3182 returning: 11000000
3185 returning: 111000000
3181 returning: 222000000
3184 returning: 222000000
[222000000, 22000000, 222000000, 22000000]
[111000000, 11000000, 111000000, 11000000]
Process finished with exit code 0
请注意,以更简单的任务结束的进程 3182 和 3185 会立即从第二个参数列表中的任务开始,而无需等待 3181 和 3184 先完成。
如果您出于某种原因真的想在每个进程处理了一定数量的任务后使用新进程,则可以使用 Pool
的 maxtasksperchild
参数。在那里,您可以指定池应在多少任务后用新进程替换旧进程。该参数的默认值为None
,因此池默认不替换进程。
【讨论】:
感谢您回答我的愚蠢问题。我显然对 Pool 的工作原理存在根本性的误解。仔细查看我的代码(我几个月前写的)后发现,我创建了一个名为 getMpBatchMap() 的函数,它明确地分块长度等于并发进程数的参数!我把它撕掉了,现在一切都很好:)再次感谢你!以上是关于multiprocessing.Pool:如何在旧进程完成时启动新进程?的主要内容,如果未能解决你的问题,请参考以下文章
超时后如何中止 multiprocessing.Pool 中的任务?
如何在 Python 中使用 multiprocessing.pool 创建全局锁/信号量?
如果子进程导致分段错误,multiprocessing.Pool 将挂起
Python:如何检查 multiprocessing.Pool 中待处理任务的数量?