显示 Python 多处理池 imap_unordered 调用的进度?

Posted

技术标签:

【中文标题】显示 Python 多处理池 imap_unordered 调用的进度?【英文标题】:Show the progress of a Python multiprocessing pool imap_unordered call? 【发布时间】:2011-08-05 17:05:45 【问题描述】:

我有一个脚本,它通过imap_unordered() 调用成功地执行了一组多处理池任务:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

但是,我的num_tasks 大约是 250,000,因此 join() 将主线程锁定 10 秒左右,我希望能够逐步回显到命令行以显示主进程未锁定。比如:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(2)

结果对象或池本身是否有指示剩余任务数的方法?我尝试使用multiprocessing.Value 对象作为计数器(do_work 在完成任务后调用counter.value += 1 操作),但计数器在停止递增之前仅达到总值的 85%。

【问题讨论】:

【参考方案1】:

快速入门

使用tqdmmultiprocessing.Pool

安装

pip install tqdm

例子

import time
import threading
from multiprocessing import Pool

from tqdm import tqdm


def do_work(x):
    time.sleep(x)
    return x


def progress():
    time.sleep(3)  # Check progress after 3 seconds
    print(f'total: pbar.total finish:pbar.n')


tasks = range(10)
pbar = tqdm(total=len(tasks))

if __name__ == '__main__':
    thread = threading.Thread(target=progress)
    thread.start()
    results = []
    with Pool(processes=5) as pool:
        for result in pool.imap_unordered(do_work, tasks):
            results.append(result)
            pbar.update(1)
    print(results)

结果

烧瓶

安装

pip install flask

main.py

import time
from multiprocessing import Pool

from tqdm import tqdm
from flask import Flask, make_response, jsonify

app = Flask(__name__)


def do_work(x):
    time.sleep(x)
    return x


total = 5  # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))


@app.route('/run/')
def run():
    results = []
    with Pool(processes=2) as pool:
        for _result in pool.imap_unordered(do_work, tasks):
            results.append(_result)
            if pbar.n >= total:
                pbar.n = 0  # reset
            pbar.update(1)
    response = make_response(jsonify(dict(results=results)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response


@app.route('/progress/')
def progress():
    response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
    response.headers.add('Access-Control-Allow-Origin', '*')
    response.headers.add('Access-Control-Allow-Headers', '*')
    response.headers.add('Access-Control-Allow-Methods', '*')
    return response

运行(例如在 Windows 中)

set FLASK_APP=main
flask run

API 列表

运行任务:http://127.0.0.1:5000/run/ 显示进度:http://127.0.0.1:5000/progress/

test.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Progress Bar</title>
    <script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
    <script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
    <link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
    <div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
         style="width: 10%">0.00%
    </div>
</div>
</body>
<script>
    function set_progress_rate(n, total) 
        //Set the rate of progress bar
        var rate = (n / total * 100).toFixed(2);
        if (n > 0) 
            $(".progress-bar").attr("aria-valuenow", n);
            $(".progress-bar").attr("aria-valuemax", total);
            $(".progress-bar").text(rate + "%");
            $(".progress-bar").css("width", rate + "%");
        
    

    $("#run").click(function () 
        //Run the task
        $.ajax(
            url: "http://127.0.0.1:5000/run/",
            type: "GET",
            success: function (response) 
                set_progress_rate(100, 100);
                console.log('Results:' + response['results']);
            
        );
    );
    setInterval(function () 
        //Show progress every 1 second
        $.ajax(
            url: "http://127.0.0.1:5000/progress/",
            type: "GET",
            success: function (response) 
                console.log(response);
                var n = response["n"];
                var total = response["total"];
                set_progress_rate(n, total);
            
        );
    , 1000);
</script>
</html>

结果

【讨论】:

【参考方案2】:

一些答案​​适用于进度条,但我无法从池中得到结果

我使用tqdm 创建进度条 你可以通过pip install tqdm安装它

下面的简单代码与进度条配合得很好,你也可以得到结果:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

tasks = range(5)
result = []

def do_work(x):
    # do something with x and return the result
    sleep(2)
    return x + 2

if __name__ == '__main__':
    pbar = tqdm(total=len(tasks))

    with Pool(2) as p:
        for i in p.imap_unordered(do_work, tasks):

            result.append(i)
            pbar.update(i)
    
    pbar.close()
    print(result)

【讨论】:

【参考方案3】:

在做了一些研究之后,我写了一个名为parallelbar 的小模块。它允许您分别显示池的整体进度和每个核心的进度。 它易于使用并且有很好的描述。

例如:

from parallelbar import progress_map
from parallelbar.tools import cpu_bench


if __name__=='__main__':
    # create list of task
    tasks = [1_000_000 + i for i in range(100)]
    progress_map(cpu_bench, tasks)

【讨论】:

【参考方案4】:

Pool.apply_async() 的简单解决方案:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.2)
    return x**2


n = 10

with Pool(4) as p, tqdm(total=n) as pbar:
    res = [p.apply_async(
        work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
    results = [r.get() for r in res]

【讨论】:

完成后应该关闭 Pool 和 pbar 可能希望避免在最后一行中对池和迭代器使用 varname p【参考方案5】:

按照 Tim 的建议,您可以使用 tqdmimap 来解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered 解决方案,以便我可以访问映射的结果。以下是它的工作原理:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

如果您不关心作业返回的值,则无需将列表分配给任何变量。

【讨论】:

这是最好的答案。在任务完成时显示进度并返回结果。【参考方案6】:

试试这个简单的基于队列的方法,它也可以与池一起使用。请注意,在进度条启动后打印任何内容都会导致它被移动,至少对于这个特定的进度条。 (PyPI的进度1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

【讨论】:

【参考方案7】:

我发现在我尝试检查进度时工作已经完成。这就是使用 tqdm 对我有用的方法。

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

这应该适用于所有类型的多处理,无论它们是否阻塞。

【讨论】:

我认为创建了一堆线程,每个线程都是独立计数的 我的函数中有函数导致酸洗错误。 这不会为我创建一个进度条,但它有点工作。它计算迭代次数(并显示总的预期迭代次数)。尽管由于线程的原因计数会上升和下降(我猜),但在任何时候都或多或少地看到它的位置并不难。到目前为止,这对我来说最有效(我必须使用返回值,这会使其他答案复杂化)。【参考方案8】:

我个人最喜欢的——在事情并行运行和提交时,给你一个漂亮的小进度条和完成 ETA。

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

【讨论】:

如果池返回值怎么办? 我在循环之前创建了一个名为 result 的空列表,然后在循环内只执行 result.append(x)。我用 2 个进程尝试了这个,并使用 imap 而不是 map,一切都按照我想要的方式运行 @nickpick 所以我的进度条正在迭代新行而不是原地进行,知道为什么会这样吗? 别忘了把这段代码包装在if __name__ == "__main__":中,否则它可能会莫名其妙地不起作用 @bs7280 你所说的 result.append(x) 是指 result.append(_) 吗? x是什么?【参考方案9】:

我创建了一个自定义类来创建进度打印输出。也许这会有所帮助:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: :.2f%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

【讨论】:

【参考方案10】:

通过更多挖掘自己找到了答案:查看imap_unordered 结果对象的__dict__,我发现它有一个_index 属性,该属性随着每个任务完成而递增。所以这适用于日志记录,包裹在 while 循环中:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

但是,我确实发现将imap_unordered 替换为map_async 会导致执行速度更快,尽管结果对象有点不同。相反,来自map_async 的结果对象具有_number_left 属性和ready() 方法:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

【讨论】:

我在 Python 2.7.6 上对此进行了测试,rs._number_left 似乎是剩余的块数。因此,如果 rs._chunksize 不是 1,那么 rs._number_left 将不是剩余的列表项数。 我应该把这段代码放在哪里?我的意思是直到知道rs 的内容之后才执行它并且有点晚了? @WakanTanka:它在分离额外线程后进入主脚本。在我原来的例子中,它进入了“while”循环,rs 已经启动了其他线程。 您能否编辑您的问题和/或答案以显示最低工作示例。我在任何循环中都看不到rs,我是多处理新手,这会有所帮助。非常感谢。 至少在python 3.5 中,使用_number_left 的解决方案不起作用。 _number_left 表示仍有待处理的块。例如,如果我想将 50 个元素并行传递给我的函数,那么对于具有 3 个进程的线程池 _map_async() 创建 10 个块,每个块包含 5 个元素。 _number_left 然后表示这些块中有多少已经完成。【参考方案11】:

我知道这是一个相当老的问题,但是当我想在 python 中跟踪任务池的进展时,我正在做这件事。

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

基本上,您将 apply_async 与 callbak 一起使用(在这种情况下,它将返回的值附加到列表中),因此您不必等待执行其他操作。然后,在一个 while 循环中,您检查工作的进度。在这种情况下,我添加了一个小部件以使其看起来更好。

输出:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

希望对你有帮助。

【讨论】:

必须更改:[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args](pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args) 这不是真的。生成器对象在这里不起作用。已检查。【参考方案12】:

不需要访问结果集的私有属性:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone 0:%'.format(i/num_tasks))

【讨论】:

我只在代码退出后看到打印输出(不是每次迭代)。你有什么建议吗? @HananShteingart:它在我的系统(Ubuntu)上运行良好,同时使用 Python 2 和 3。我以 def do_word(*a): time.sleep(.1) 为例。如果它对您不起作用,则创建一个 complete minimal code example 来演示您的问题:使用文字描述您期望发生什么以及会发生什么,提及您如何运行 Python 脚本、您的操作系统、Python 版本和post it as a new question. 我遇到了和@HananShteingart 一样的问题:这是因为我尝试使用Pool.map()。我没有意识到 only imap()imap_unordered() 以这种方式工作 - 文档只是说“map() 的惰性版本”,​​但实际上意味着“底层迭代器返回结果,因为它们进来”。 @simonmacmullen:问题和我的答案都使用imap_unordered()。 Hanan 的问题可能是由于sys.stderr.write('\r..')(覆盖同一行以显示进度)。 也可以!我主要是想记录下我所做的一个愚蠢的假设——以防其他阅读这篇文章的人也做到了。

以上是关于显示 Python 多处理池 imap_unordered 调用的进度?的主要内容,如果未能解决你的问题,请参考以下文章

进程池Pool的imap方法解析

多处理:池:直接迭代器或使用变量来存储迭代器

我可以将 map / imap / imap_unordered 与不带参数的函数一起使用吗?

如何在python 2.7中使用pymongo进行多处理池

Python多处理:如何在异常时关闭多处理池

python的多处理池的键盘中断