显示 Python 多处理池 imap_unordered 调用的进度?
Posted
技术标签:
【中文标题】显示 Python 多处理池 imap_unordered 调用的进度?【英文标题】:Show the progress of a Python multiprocessing pool imap_unordered call? 【发布时间】:2011-08-05 17:05:45 【问题描述】:我有一个脚本,它通过imap_unordered()
调用成功地执行了一组多处理池任务:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
但是,我的num_tasks
大约是 250,000,因此 join()
将主线程锁定 10 秒左右,我希望能够逐步回显到命令行以显示主进程未锁定。比如:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print("Waiting for", remaining, "tasks to complete...")
time.sleep(2)
结果对象或池本身是否有指示剩余任务数的方法?我尝试使用multiprocessing.Value
对象作为计数器(do_work
在完成任务后调用counter.value += 1
操作),但计数器在停止递增之前仅达到总值的 85%。
【问题讨论】:
【参考方案1】:快速入门
使用tqdm
和multiprocessing.Pool
安装
pip install tqdm
例子
import time
import threading
from multiprocessing import Pool
from tqdm import tqdm
def do_work(x):
time.sleep(x)
return x
def progress():
time.sleep(3) # Check progress after 3 seconds
print(f'total: pbar.total finish:pbar.n')
tasks = range(10)
pbar = tqdm(total=len(tasks))
if __name__ == '__main__':
thread = threading.Thread(target=progress)
thread.start()
results = []
with Pool(processes=5) as pool:
for result in pool.imap_unordered(do_work, tasks):
results.append(result)
pbar.update(1)
print(results)
结果
烧瓶
安装
pip install flask
main.py
import time
from multiprocessing import Pool
from tqdm import tqdm
from flask import Flask, make_response, jsonify
app = Flask(__name__)
def do_work(x):
time.sleep(x)
return x
total = 5 # num of tasks
tasks = range(total)
pbar = tqdm(total=len(tasks))
@app.route('/run/')
def run():
results = []
with Pool(processes=2) as pool:
for _result in pool.imap_unordered(do_work, tasks):
results.append(_result)
if pbar.n >= total:
pbar.n = 0 # reset
pbar.update(1)
response = make_response(jsonify(dict(results=results)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
@app.route('/progress/')
def progress():
response = make_response(jsonify(dict(n=pbar.n, total=pbar.total)))
response.headers.add('Access-Control-Allow-Origin', '*')
response.headers.add('Access-Control-Allow-Headers', '*')
response.headers.add('Access-Control-Allow-Methods', '*')
return response
运行(例如在 Windows 中)
set FLASK_APP=main
flask run
API 列表
运行任务:http://127.0.0.1:5000/run/ 显示进度:http://127.0.0.1:5000/progress/test.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Progress Bar</title>
<script src="https://cdn.bootcss.com/jquery/3.0.0/jquery.min.js"></script>
<script src="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/js/bootstrap.min.js"></script>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<button id="run">Run the task</button>
<br><br>
<div class="progress">
<div class="progress-bar" role="progressbar" aria-valuenow="1" aria-valuemin="0" aria-valuemax="100"
style="width: 10%">0.00%
</div>
</div>
</body>
<script>
function set_progress_rate(n, total)
//Set the rate of progress bar
var rate = (n / total * 100).toFixed(2);
if (n > 0)
$(".progress-bar").attr("aria-valuenow", n);
$(".progress-bar").attr("aria-valuemax", total);
$(".progress-bar").text(rate + "%");
$(".progress-bar").css("width", rate + "%");
$("#run").click(function ()
//Run the task
$.ajax(
url: "http://127.0.0.1:5000/run/",
type: "GET",
success: function (response)
set_progress_rate(100, 100);
console.log('Results:' + response['results']);
);
);
setInterval(function ()
//Show progress every 1 second
$.ajax(
url: "http://127.0.0.1:5000/progress/",
type: "GET",
success: function (response)
console.log(response);
var n = response["n"];
var total = response["total"];
set_progress_rate(n, total);
);
, 1000);
</script>
</html>
结果
【讨论】:
【参考方案2】:一些答案适用于进度条,但我无法从池中得到结果
我使用tqdm 创建进度条
你可以通过pip install tqdm
安装它
下面的简单代码与进度条配合得很好,你也可以得到结果:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
tasks = range(5)
result = []
def do_work(x):
# do something with x and return the result
sleep(2)
return x + 2
if __name__ == '__main__':
pbar = tqdm(total=len(tasks))
with Pool(2) as p:
for i in p.imap_unordered(do_work, tasks):
result.append(i)
pbar.update(i)
pbar.close()
print(result)
【讨论】:
【参考方案3】:在做了一些研究之后,我写了一个名为parallelbar 的小模块。它允许您分别显示池的整体进度和每个核心的进度。 它易于使用并且有很好的描述。
例如:
from parallelbar import progress_map
from parallelbar.tools import cpu_bench
if __name__=='__main__':
# create list of task
tasks = [1_000_000 + i for i in range(100)]
progress_map(cpu_bench, tasks)
【讨论】:
【参考方案4】:Pool.apply_async()
的简单解决方案:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
def work(x):
sleep(0.2)
return x**2
n = 10
with Pool(4) as p, tqdm(total=n) as pbar:
res = [p.apply_async(
work, args=(i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [r.get() for r in res]
【讨论】:
完成后应该关闭 Pool 和 pbar 可能希望避免在最后一行中对池和迭代器使用 varnamep
?【参考方案5】:
按照 Tim 的建议,您可以使用 tqdm
和 imap
来解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered
解决方案,以便我可以访问映射的结果。以下是它的工作原理:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
如果您不关心作业返回的值,则无需将列表分配给任何变量。
【讨论】:
这是最好的答案。在任务完成时显示进度并返回结果。【参考方案6】:试试这个简单的基于队列的方法,它也可以与池一起使用。请注意,在进度条启动后打印任何内容都会导致它被移动,至少对于这个特定的进度条。 (PyPI的进度1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()
【讨论】:
【参考方案7】:我发现在我尝试检查进度时工作已经完成。这就是使用 tqdm 对我有用的方法。
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
这应该适用于所有类型的多处理,无论它们是否阻塞。
【讨论】:
我认为创建了一堆线程,每个线程都是独立计数的 我的函数中有函数导致酸洗错误。 这不会为我创建一个进度条,但它有点工作。它计算迭代次数(并显示总的预期迭代次数)。尽管由于线程的原因计数会上升和下降(我猜),但在任何时候都或多或少地看到它的位置并不难。到目前为止,这对我来说最有效(我必须使用返回值,这会使其他答案复杂化)。【参考方案8】:我个人最喜欢的——在事情并行运行和提交时,给你一个漂亮的小进度条和完成 ETA。
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
【讨论】:
如果池返回值怎么办? 我在循环之前创建了一个名为 result 的空列表,然后在循环内只执行 result.append(x)。我用 2 个进程尝试了这个,并使用 imap 而不是 map,一切都按照我想要的方式运行 @nickpick 所以我的进度条正在迭代新行而不是原地进行,知道为什么会这样吗? 别忘了把这段代码包装在if __name__ == "__main__":
中,否则它可能会莫名其妙地不起作用
@bs7280 你所说的 result.append(x) 是指 result.append(_) 吗? x是什么?【参考方案9】:
我创建了一个自定义类来创建进度打印输出。也许这会有所帮助:
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: :.2f%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
【讨论】:
【参考方案10】:通过更多挖掘自己找到了答案:查看imap_unordered
结果对象的__dict__
,我发现它有一个_index
属性,该属性随着每个任务完成而递增。所以这适用于日志记录,包裹在 while
循环中:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
但是,我确实发现将imap_unordered
替换为map_async
会导致执行速度更快,尽管结果对象有点不同。相反,来自map_async
的结果对象具有_number_left
属性和ready()
方法:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
【讨论】:
我在 Python 2.7.6 上对此进行了测试,rs._number_left 似乎是剩余的块数。因此,如果 rs._chunksize 不是 1,那么 rs._number_left 将不是剩余的列表项数。 我应该把这段代码放在哪里?我的意思是直到知道rs
的内容之后才执行它并且有点晚了?
@WakanTanka:它在分离额外线程后进入主脚本。在我原来的例子中,它进入了“while”循环,rs
已经启动了其他线程。
您能否编辑您的问题和/或答案以显示最低工作示例。我在任何循环中都看不到rs
,我是多处理新手,这会有所帮助。非常感谢。
至少在python 3.5
中,使用_number_left
的解决方案不起作用。 _number_left
表示仍有待处理的块。例如,如果我想将 50 个元素并行传递给我的函数,那么对于具有 3 个进程的线程池 _map_async()
创建 10 个块,每个块包含 5 个元素。 _number_left
然后表示这些块中有多少已经完成。【参考方案11】:
我知道这是一个相当老的问题,但是当我想在 python 中跟踪任务池的进展时,我正在做这件事。
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
基本上,您将 apply_async 与 callbak 一起使用(在这种情况下,它将返回的值附加到列表中),因此您不必等待执行其他操作。然后,在一个 while 循环中,您检查工作的进度。在这种情况下,我添加了一个小部件以使其看起来更好。
输出:
4 of 4
['AA', 'BB', 'CC', 'DD']
希望对你有帮助。
【讨论】:
必须更改:[pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
为 (pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args)
这不是真的。生成器对象在这里不起作用。已检查。【参考方案12】:
不需要访问结果集的私有属性:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone 0:%'.format(i/num_tasks))
【讨论】:
我只在代码退出后看到打印输出(不是每次迭代)。你有什么建议吗? @HananShteingart:它在我的系统(Ubuntu)上运行良好,同时使用 Python 2 和 3。我以def do_word(*a): time.sleep(.1)
为例。如果它对您不起作用,则创建一个 complete minimal code example 来演示您的问题:使用文字描述您期望发生什么以及会发生什么,提及您如何运行 Python 脚本、您的操作系统、Python 版本和post it as a new question.
我遇到了和@HananShteingart 一样的问题:这是因为我尝试使用Pool.map()
。我没有意识到 only imap()
和 imap_unordered()
以这种方式工作 - 文档只是说“map() 的惰性版本”,但实际上意味着“底层迭代器返回结果,因为它们进来”。
@simonmacmullen:问题和我的答案都使用imap_unordered()
。 Hanan 的问题可能是由于sys.stderr.write('\r..')
(覆盖同一行以显示进度)。
也可以!我主要是想记录下我所做的一个愚蠢的假设——以防其他阅读这篇文章的人也做到了。以上是关于显示 Python 多处理池 imap_unordered 调用的进度?的主要内容,如果未能解决你的问题,请参考以下文章