将 tqdm 与 concurrent.futures 一起使用?

Posted

技术标签:

【中文标题】将 tqdm 与 concurrent.futures 一起使用?【英文标题】:Use tqdm with concurrent.futures? 【发布时间】:2019-01-07 03:56:03 【问题描述】:

我有一个多线程函数,我想要一个状态栏来使用tqdm。有没有一种简单的方法可以用ThreadPoolExecutor 显示状态栏?让我困惑的是并行化部分。

import concurrent.futures

def f(x):
    return f**2

my_iter = range(1000000)

def run(f,my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        function = list(executor.map(f, my_iter))
    return results

run(f, my_iter) # wrap tqdr around this function?

【问题讨论】:

你可以使用from tqdm.contrib.concurrent import process_map见***.com/questions/41920124/… 【参考方案1】:

您可以将tqdm 包裹在executor 周围,如下所示以跟踪进度:

list(tqdm(executor.map(f, iter), total=len(iter))

这是你的例子:

import time  
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(tqdm(executor.map(f, my_iter), total=len(my_iter)))
    return results

my_iter = range(100000)
run(f, my_iter)

结果是这样的:

16%|██▏           | 15707/100000 [00:00<00:02, 31312.54it/s]

【讨论】:

谢谢!关键似乎是tqdm周围的list(),为什么会这样呢? @DreamFlasher:这种行为是因为 tqdm 在执行时运行。 Executor.map 本身只是一个生成器。 这样,你不会立即得到输出!所以你必须等到完整的进度完成才能看到完整的结果! tqdm 中的total 参数很重要。没有它,我们就看不到整体的进步。 这会阻塞进度条的时间更新,有没有办法解决?【参考方案2】:

接受答案的问题是ThreadPoolExecutor.map 函数必须生成结果,而不是按照它们可用的顺序。因此,如果myfunc 的第一次调用恰好是最后一次调用,则进度条将同时从 0% 变为 100%,并且仅当所有调用都完成时。将ThreadPoolExecutor.submitas_completed 一起使用会更好:

import time
import concurrent.futures
from tqdm import tqdm

def f(x):
    time.sleep(0.001)  # to visualize the progress
    return x**2

def run(f, my_iter):
    l = len(my_iter)
    with tqdm(total=l) as pbar:
        # let's give it some more threads:
        with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
            futures = executor.submit(f, arg): arg for arg in my_iter
            results = 
            for future in concurrent.futures.as_completed(futures):
                arg = futures[future]
                results[arg] = future.result()
                pbar.update(1)
    print(321, results[321])

my_iter = range(100000)
run(f, my_iter)

打印:

321 103041

这只是一般的想法。根据my_iter 的类型,可能无法直接将len 函数直接应用于它而不先将其转换为列表。重点是submitas_completed一起使用。

【讨论】:

谢谢!这确实有帮助,但出于某种原因,进度条在一段时间后停止了? 只是想提一下,只要稍加修改(移至def main()),这与ProcessPoolExecutor 一样有效,如果f(x) 实际进行计算,它会更快,因为它不是受全局解释器锁影响。 刚才有人问我,这里是适应ProcessPoolExecutorgist.github.com/ltalirz/9220946c5c9fd920a1a2d81ce7375c47的示例代码 @leopold.talirz 当然,如果不是为了“可视化结果”而添加的对 sleep 的调用,即使对于多处理,函数 f 也确实是一个糟糕的候选者因为它的 CPU 密集度不足以证明增加的开销是合理的(也就是说,只在循环中调用 f 会更快)。据我了解,问题的真正意义在于如何更新进度条。但值得一提的是,通过调用sleep,多线程比使用这个特定的f 函数 的多线程处理更好,因为它减少了开销。 这会阻塞进度条的时间更新,有没有办法解决?【参考方案3】:

我认为最简单的方法:

with ThreadPoolExecutor(max_workers=20) as executor:
    results = list(tqdm(executor.map(myfunc, range(len(my_array))), total=len(my_array)))

【讨论】:

以上是关于将 tqdm 与 concurrent.futures 一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

星图与tqdm结合?

创建线程的三种方式及其优缺点

java线程的6种状态以及相互转换

tqdm()与set_description()的用法

如何在 jupyter 笔记本中将 tqdm 与 pandas 一起使用?

tqdm, pyyaml, traceback的使用