我们如何在与 joblib 的并行执行中使用 tqdm?

Posted

技术标签:

【中文标题】我们如何在与 joblib 的并行执行中使用 tqdm?【英文标题】:How can we use tqdm in a parallel execution with joblib? 【发布时间】:2016-10-14 17:30:31 【问题描述】:

我想并行运行一个函数,并等待所有并行节点完成,使用 joblib。就像在示例中一样:

from math import sqrt
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

但是,我希望像 tqdm 一样在单个进度条中看到执行,显示已完成的作业数。

你会怎么做?

【问题讨论】:

danshiebler.com/2016-09-14-parallel-progress-bar 也许这个网站可以帮到你。 看看下面的 niedakh! 【参考方案1】:

只需将range(10) 放入tqdm(...)!这对你来说可能看起来太好了,但它确实有效(在我的机器上):

from math import sqrt
from joblib import Parallel, delayed  
from tqdm import tqdm  
result = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in tqdm(range(100000)))

【讨论】:

这仅在进程开始时显示进度,而不是在完成时显示:Parallel(n_jobs=10)(delayed(time.sleep)(i ** 2) for i in tqdm(range(10))) 它可以工作,但不适用于例如字符串列表...还尝试将列表包装在iter... @curious95 尝试将列表放入生成器,以下似乎对我有用:from math import sqrt from joblib import Parallel, delayed import multiprocessing from tqdm import tqdm rng = range(100000) rng = ['a','b','c','d'] for j in range(20): rng += rng def get_rng(): i = 0 for i in range(len(rng)): yield rng[i] result = Parallel(n_jobs=2)(delayed(sqrt)(len(i) ** 2) for i in tqdm(get_rng())) 另外一个问题,有一个很优雅的solution来解决这个问题。 这行不通,tqdm 将立即转到 %100。【参考方案2】:

我已经创建了 pqdm 一个并行的 tqdm 包装器,它带有并发的未来来轻松完成这项工作,试一试!

安装

pip install pqdm

并使用

from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm

args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work

def square(a):
    return a*a

result = pqdm(args, square, n_jobs=2)

【讨论】:

干得好家伙!无法忍受你为什么不被接受。非常感谢! 不幸的是,这对我来说失败了。我不知道为什么,但看起来 pqdm 并没有等到函数调用结束。我现在没有时间创建 MWE。不过,感谢您的努力(和 +1)。 @YairDaon 可能会尝试使用有界执行器,尝试将bounded=True 添加到 pqdm。 这就像一个魅力,感谢图书馆。有帮助! 它是否适用于列表理解?【参考方案3】:

修改 nth's great answer 以允许动态标志使用或不使用 TQDM,并提前指定总数,以便正确填写状态栏。

from tqdm.auto import tqdm
from joblib import Parallel

class ProgressParallel(Parallel):
    def __init__(self, use_tqdm=True, total=None, *args, **kwargs):
        self._use_tqdm = use_tqdm
        self._total = total
        super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
        with tqdm(disable=not self._use_tqdm, total=self._total) as self._pbar:
            return Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        if self._total is None:
            self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

【讨论】:

【参考方案4】:

如上所述,仅包装传递给joblib.Parallel() 的可迭代的解决方案并不能真正监控执行进度。相反,我建议继承 Parallel 并覆盖 print_progress() 方法,如下所示:

import joblib
from tqdm.auto import tqdm

class ProgressParallel(joblib.Parallel):
    def __call__(self, *args, **kwargs):
        with tqdm() as self._pbar:
            return joblib.Parallel.__call__(self, *args, **kwargs)

    def print_progress(self):
        self._pbar.total = self.n_dispatched_tasks
        self._pbar.n = self.n_completed_tasks
        self._pbar.refresh()

【讨论】:

【参考方案5】:

这里是可能的解决方法

def func(x):
    time.sleep(random.randint(1, 10))
    return x

def text_progessbar(seq, total=None):
    step = 1
    tick = time.time()
    while True:
        time_diff = time.time()-tick
        avg_speed = time_diff/step
        total_str = 'of %n' % total if total else ''
        print('step', step, '%.2f' % time_diff, 
              'avg: %.2f iter/sec' % avg_speed, total_str)
        step += 1
        yield next(seq)

all_bar_funcs = 
    'tqdm': lambda args: lambda x: tqdm(x, **args),
    'txt': lambda args: lambda x: text_progessbar(x, **args),
    'False': lambda args: iter,
    'None': lambda args: iter,


def ParallelExecutor(use_bar='tqdm', **joblib_args):
    def aprun(bar=use_bar, **tq_args):
        def tmp(op_iter):
            if str(bar) in all_bar_funcs.keys():
                bar_func = all_bar_funcs[str(bar)](tq_args)
            else:
                raise ValueError("Value %s not supported as bar type"%bar)
            return Parallel(**joblib_args)(bar_func(op_iter))
        return tmp
    return aprun

aprun = ParallelExecutor(n_jobs=5)

a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar='txt')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

【讨论】:

这是一个四处走走,但进度条只有在分派任务时才会更新。更新进度条的更好时机是任务完成的时间。【参考方案6】:

如果您的问题由多个部分组成,您可以将这些部分拆分为k 子组,并行运行每个子组并更新其间的进度条,从而使k 更新进度。

这在文档中的以下示例中得到了演示。

>>> with Parallel(n_jobs=2) as parallel:
...    accumulator = 0.
...    n_iter = 0
...    while accumulator < 1000:
...        results = parallel(delayed(sqrt)(accumulator + i ** 2)
...                           for i in range(5))
...        accumulator += sum(results)  # synchronization barrier
...        n_iter += 1

https://pythonhosted.org/joblib/parallel.html#reusing-a-pool-of-workers

【讨论】:

这如何回答关于“单个进度条”的问题? 这绝对不能回答关于进度条的问题

以上是关于我们如何在与 joblib 的并行执行中使用 tqdm?的主要内容,如果未能解决你的问题,请参考以下文章

多返回值函数的joblib并行处理

使用 Joblib 将类对象实例作为输入参数的并行化函数

Python,与 joblib 并行化:延迟多个参数

joblib保存模型和joblib的并行化处理和tqdm

这个joblib是什么并行语法呢?这么多括号

多个进程共享一个 Joblib 缓存