Python:如何检查 multiprocessing.Pool 中待处理任务的数量?

Posted

技术标签:

【中文标题】Python:如何检查 multiprocessing.Pool 中待处理任务的数量?【英文标题】:Python: How can I check the number of pending tasks in a multiprocessing.Pool? 【发布时间】:2011-07-29 09:50:35 【问题描述】:

我有一小部分工人 (4) 和一个非常大的任务列表 (5000~)。我正在使用一个池并使用 map_async() 发送任务。因为我正在运行的任务相当长,所以我强制将块大小设置为 1,这样一个长进程就无法容纳一些较短的进程。

我想做的是定期检查还有多少任务需要提交。我知道最多有 4 个处于活动状态,我关心还有多少需要处理。

我用谷歌搜索过,找不到任何人这样做。

一些简单的帮助代码:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

【问题讨论】:

我应该注意到我在 RHEL-6 系统上使用 python2.6,但是我对不同版本/平台上的示例持开放态度。 任务完成时减少的静态变量? (当任务明显开始时增加)。 在工作人员到达任务之前,任务不会“开始”。我想如果我创建了一个与要完成的任务大小相同的全局变量,然后每次启动可能会执行此操作的任务时将其递减,但这有点尴尬并且需要考虑线程安全。 更改以获取示例代码以编译和运行:fpaste.org/p4Hb。另外:gist.github.com/902947 谢谢亚当,我已经让上面的代码工作了。 【参考方案1】:

据我所知没有密闭的方式,但是如果你使用Pool.imap_unordered()函数而不是map_async,你可以截取被处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我要减去 process_count,因为您几乎可以假设所有进程都将处理以下两个例外之一:1) 如果您使用迭代器,则可能没有更多的项目可以使用和处理,并且2) 您的剩余物品可能少于 4 件。我没有为第一个异常编写代码。但是,如果您需要,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。

编辑:我还意识到您正在使用 While 循环,这使您看起来像是在尝试定期更新某些内容,例如每半秒或某事。我作为示例给出的代码不会那样做。我不确定这是否有问题。

【讨论】:

谢谢。我还没有真正探索过 imap 函数(文档有点……简洁)。不过你说得对,我想在工作进行时做一些其他事情,并定期报告剩余的工作量。【参考方案2】:

看起来jobs._number_left 是您想要的。 _ 表示它是一个内部值,可能会随开发人员的心血来潮而改变,但它似乎是获取该信息的唯一方法。

【讨论】:

啊!它不在 API 文档中,我忘记在 ipython 中的作业上执行 dir()。感谢您的回答! API 文档中没有 _number_left 是否有充分的理由?它是否会被弃用或在未来更改名称?【参考方案3】:

我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我的处理方式是使用apply_async 一次发送一个任务。我所做的非常简化的版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用Queue 而不是return 来获取结果。

【讨论】:

【参考方案4】:

假设您使用的是apply_async,您可以通过查看Pool._cache 属性来检查待处理作业的数量。这是 ApplyResult 存储的位置,直到它们可用并且等于待处理的 ApplyResults 的数量。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()

【讨论】:

以上是关于Python:如何检查 multiprocessing.Pool 中待处理任务的数量?的主要内容,如果未能解决你的问题,请参考以下文章

如何检查运行我的脚本的 Python 版本?

如何检查python中是不是存在文件? [复制]

如何进行PYTHON语法检查

Python 究竟是如何检查列表的?

如何检查Python中是不是存在方法?

如何检查变量是不是是Python中的字典?