多处理:使用 tqdm 显示进度条
Posted
技术标签:
【中文标题】多处理:使用 tqdm 显示进度条【英文标题】:Multiprocessing : use tqdm to display a progress bar 【发布时间】:2017-06-14 16:17:20 【问题描述】:为了使我的代码更“pythonic”和更快,我使用“多处理”和一个映射函数来发送它a)函数和b)迭代范围。
植入的解决方案(即直接在 tqdm.tqdm(range(0, 30)) 范围内调用 tqdm)不适用于多处理(如下面的代码所示)。
进度条显示从0到100%(python读取代码时?)但并不表示map函数的实际进度。
如何显示进度条,指示“地图”功能在哪一步?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
欢迎任何帮助或建议...
【问题讨论】:
可以把进度条的代码sn-p贴出来吗? 对于使用.starmap()
寻找解决方案的人:Here 是Pool
添加.istarmap()
的补丁,它也适用于tqdm
。
【参考方案1】:
import multiprocessing as mp
import tqdm
iterable = ...
num_cpu = mp.cpu_count() - 2 # dont use all cpus.
def func():
# your logic
...
if __name__ == '__main__':
with mp.Pool(num_cpu) as p:
list(tqdm.tqdm(p.imap(func, iterable), total=len(iterable)))
【讨论】:
【参考方案2】:对于带有 apply_async 的进度条,我们可以使用以下代码:
https://github.com/tqdm/tqdm/issues/484
import time
import random
from multiprocessing import Pool
from tqdm import tqdm
def myfunc(a):
time.sleep(random.random())
return a ** 2
pool = Pool(2)
pbar = tqdm(total=100)
def update(*a):
pbar.update()
for i in range(pbar.total):
pool.apply_async(myfunc, args=(i,), callback=update)
pool.close()
pool.join()
【讨论】:
【参考方案3】:抱歉迟到了,如果您只需要一个并发地图,我在tqdm>=4.42.0
中添加了此功能:
from tqdm.contrib.concurrent import process_map # or thread_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = process_map(_foo, range(0, 30), max_workers=2)
参考:https://tqdm.github.io/docs/contrib.concurrent/ 和 https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
它支持max_workers
和chunksize
,您也可以轻松地从process_map
切换到thread_map
。
【讨论】:
我看到一个关于破解 tqdm_notebook 的讨论问题,但是,无法找到解决 tqdm.contrib.concurrent 的解决方案。 @Xudongprocess_map
创建、运行、关闭/加入并返回一个列表。
这太棒了!很高兴我找到了它。一个问题仍然存在,当我在 jupyter notebook 中使用它时,它不能很好地工作。我知道有一个tqdm.notebook
,有没有办法将两者合并?
这会无条件复制迭代的参数,而其他的似乎是写时复制。
@jlconlin @Vladimir Vargas 如果我做类似的事情,我没有任何问题。 thread_map(fn, *iterables, tqdm_class=tqdm.notebook.tqdm, max_workers=12)
今天在 Jupyter 笔记本中。【参考方案4】:
当您需要从并行执行的函数中获取结果时,这是我的看法。这个函数做了一些事情(我的另一篇文章进一步解释了它),但关键是有一个任务待处理队列和一个任务完成队列。当工作人员完成待处理队列中的每个任务时,他们会将结果添加到任务完成队列中。您可以使用 tqdm 进度条将检查包装到任务完成队列中。 do_work() 函数的实现我没有放在这里,它不相关,因为这里的消息是监控任务完成队列并在每次有结果时更新进度条。
def par_proc(job_list, num_cpus=None, verbose=False):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job =
num_tasks = num_tasks + 1
expanded_job.update('func': pickle.dumps(job['func']))
expanded_job.update('task': task)
tasks_pending.put(expanded_job)
# Set the number of workers here
num_workers = min(num_cpus, num_tasks)
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: '.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
with tqdm(total=num_tasks) as bar:
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
bar.update(completed_tasks_counter)
for p in processes:
p.join()
return results
【讨论】:
【参考方案5】:找到的解决方案:小心!由于多处理,估计时间(每个循环的迭代次数、总时间等)可能不稳定,但进度条运行良好。
注意:Pool 的上下文管理器仅适用于 Python 3.3 版
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
pbar.update()
【讨论】:
这里需要第二个/内部tqdm
调用吗?
返回为“r”的 _foo(my_number) 的输出如何?
starmap()
有类似的解决方案吗?
@shadowtalker - 它似乎没有;)。无论如何 - imap_unordered
是这里的关键,它提供了最佳性能和最佳进度条估计。
如何使用此解决方案检索结果?【参考方案6】:
这种方法简单而且有效。
from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm
def job():
time.sleep(1)
pbar.update()
pool = ThreadPool(5)
with tqdm(total=100) as pbar:
for i in range(100):
pool.apply_async(job)
pool.close()
pool.join()
【讨论】:
【参考方案7】:您可以改用p_tqdm
。
https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = p_map(_foo, list(range(0, 30)))
【讨论】:
这非常好用,pip install
很容易。这将取代 tqdm 来满足我的大部分需求
谢谢维克多 ;)
p_tqdm
仅限于multiprocessing.Pool
,不适用于线程
我可以指定p_map的worker数量吗?
@VictorWang 是的,像这样在num_cpus
中使用它 => p_map(_foo, list(range(0, 30)), num_cpus=5)
【参考方案8】:
根据 Xavi Martínez 的回答,我编写了函数 imap_unordered_bar
。它的使用方式与imap_unordered
相同,唯一的区别是显示了一个处理栏。
from multiprocessing import Pool
import time
from tqdm import *
def imap_unordered_bar(func, args, n_processes = 2):
p = Pool(n_processes)
res_list = []
with tqdm(total = len(args)) as pbar:
for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
pbar.update()
res_list.append(res)
pbar.close()
p.close()
p.join()
return res_list
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
result = imap_unordered_bar(_foo, range(5))
【讨论】:
这将在新行的每一步重新绘制条。如何更新同一行? 我的解决方案(Windows/Powershell):Colorama。 'pbar.close() 不需要,它会在 with 终止时自动关闭,就像 Sagar 在 @scipy 的回答中所做的评论一样【参考方案9】:使用 imap 代替 map,它返回处理值的迭代器。
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
【讨论】:
一个封闭的 list() 语句等待迭代器结束。 total= 也是必需的,因为 tqdm 不知道迭代需要多长时间,starmap()
有类似的解决方案吗?
for i in tqdm.tqdm(...): pass
可能更直接,list(tqdm.tqdm)
这可行,但有没有其他人让它在每次迭代的换行符上连续打印进度条?
当特定 chunk_size
或 p.imap
时,行为是有线的。 tqdm
可以更新每个迭代而不是每个块吗?以上是关于多处理:使用 tqdm 显示进度条的主要内容,如果未能解决你的问题,请参考以下文章
pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案
pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案