Python:如何检查 multiprocessing.Pool 中待处理任务的数量?
Posted
技术标签:
【中文标题】Python:如何检查 multiprocessing.Pool 中待处理任务的数量?【英文标题】:Python: How can I check the number of pending tasks in a multiprocessing.Pool? 【发布时间】:2011-07-29 09:50:35 【问题描述】:我有一小部分工人 (4) 和一个非常大的任务列表 (5000~)。我正在使用一个池并使用 map_async() 发送任务。因为我正在运行的任务相当长,所以我强制将块大小设置为 1,这样一个长进程就无法容纳一些较短的进程。
我想做的是定期检查还有多少任务需要提交。我知道最多有 4 个处于活动状态,我关心还有多少需要处理。
我用谷歌搜索过,找不到任何人这样做。
一些简单的帮助代码:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
【问题讨论】:
我应该注意到我在 RHEL-6 系统上使用 python2.6,但是我对不同版本/平台上的示例持开放态度。 任务完成时减少的静态变量? (当任务明显开始时增加)。 在工作人员到达任务之前,任务不会“开始”。我想如果我创建了一个与要完成的任务大小相同的全局变量,然后每次启动可能会执行此操作的任务时将其递减,但这有点尴尬并且需要考虑线程安全。 更改以获取示例代码以编译和运行:fpaste.org/p4Hb。另外:gist.github.com/902947 谢谢亚当,我已经让上面的代码工作了。 【参考方案1】:据我所知没有密闭的方式,但是如果你使用Pool.imap_unordered()
函数而不是map_async,你可以截取被处理的元素。
import multiprocessing
import time
process_count = 4
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
# Actually, you should return the job you've created here.
return num
pool = multiprocess.Pool(process_count)
jobs = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
jobs.append(job)
job_count += 1
incomplete = len(items) - job_count
unsubmitted = max(0, incomplete - process_count)
print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted
pool.close()
我要减去 process_count
,因为您几乎可以假设所有进程都将处理以下两个例外之一:1) 如果您使用迭代器,则可能没有更多的项目可以使用和处理,并且2) 您的剩余物品可能少于 4 件。我没有为第一个异常编写代码。但是,如果您需要,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。
编辑:我还意识到您正在使用 While 循环,这使您看起来像是在尝试定期更新某些内容,例如每半秒或某事。我作为示例给出的代码不会那样做。我不确定这是否有问题。
【讨论】:
谢谢。我还没有真正探索过 imap 函数(文档有点……简洁)。不过你说得对,我想在工作进行时做一些其他事情,并定期报告剩余的工作量。【参考方案2】:看起来jobs._number_left
是您想要的。 _
表示它是一个内部值,可能会随开发人员的心血来潮而改变,但它似乎是获取该信息的唯一方法。
【讨论】:
啊!它不在 API 文档中,我忘记在 ipython 中的作业上执行 dir()。感谢您的回答! API 文档中没有 _number_left 是否有充分的理由?它是否会被弃用或在未来更改名称?【参考方案3】:我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我的处理方式是使用apply_async
一次发送一个任务。我所做的非常简化的版本:
maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early
if donecounter == sendcounter: # wait til already sent tasks finish running
break
else: # don't send new tasks if it's time to stop
while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
pool.apply_async(mytask, (runlist[sendcounter], q))
sendcounter += 1
while not q.empty(): # process completed results as they arrive
aresult = q.get()
processResults(aresult)
donecounter += 1
请注意,我使用Queue
而不是return
来获取结果。
【讨论】:
【参考方案4】:假设您使用的是apply_async
,您可以通过查看Pool._cache
属性来检查待处理作业的数量。这是 ApplyResult
存储的位置,直到它们可用并且等于待处理的 ApplyResult
s 的数量。
import multiprocessing as mp
import random
import time
def job():
time.sleep(random.randint(1,10))
print("job finished")
if __name__ == '__main__':
pool = mp.Pool(5)
for _ in range(10):
pool.apply_async(job)
while pool._cache:
print("number of jobs pending: ", len(pool._cache))
time.sleep(2)
pool.close()
pool.join()
【讨论】:
以上是关于Python:如何检查 multiprocessing.Pool 中待处理任务的数量?的主要内容,如果未能解决你的问题,请参考以下文章