星图与tqdm结合?

Posted

技术标签:

【中文标题】星图与tqdm结合?【英文标题】:Starmap combined with tqdm? 【发布时间】:2019-12-12 17:56:18 【问题描述】:

我在做一些并行处理,如下:

with mp.Pool(8) as tmpPool:
        results = tmpPool.starmap(my_function, inputs)

输入如下所示: [(1,0.2312),(5,0.52) ...] 即,int 和 float 的元组。

代码运行良好,但我似乎无法将其包裹在加载栏 (tqdm) 周围,例如可以使用 imap 方法完成,如下所示:

tqdm.tqdm(mp.imap(some_function,some_inputs))

星图也可以这样做吗?

谢谢!

【问题讨论】:

如果可能的话,我会说更改您的my_function 以接收一个打包参数并将其解压缩到函数中,然后使用imap 是的,这是目前的默认解决方案。我仍然想知道星图是否支持这个(或它的任何变体) 不是我知道或可以在文档中看到的。我知道的唯一变体是starmap_async,它只是非阻塞的,但仍返回一个结果对象。我相信您将不得不调整您的函数以使用imap,因为它是唯一可以用作生成器并且不会立即返回所有结果的选项。很乐意看看是否有更好的解决方案 谢谢,目前,我已经用 imap 重新实现了它。如果也有 istarmap 那就太好了! 【参考方案1】:

临时解决办法:用imap重写待并行化的方法。

【讨论】:

【参考方案2】:

starmap() 无法实现,但通过添加 Pool.istarmap() 的补丁可以实现。它基于imap() 的代码。您所要做的就是创建istarmap.py-文件并导入模块以应用补丁,然后再进行常规的多处理导入。

Python

# istarmap.py for Python <3.8
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    if self._state != mpp.RUN:
        raise ValueError("Pool not running")

    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not 0:n".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self._cache)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

Python 3.8+

# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp


def istarmap(self, func, iterable, chunksize=1):
    """starmap-version of imap
    """
    self._check_running()
    if chunksize < 1:
        raise ValueError(
            "Chunksize must be 1+, not 0:n".format(
                chunksize))

    task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
    result = mpp.IMapIterator(self)
    self._taskqueue.put(
        (
            self._guarded_task_generation(result._job,
                                          mpp.starmapstar,
                                          task_batches),
            result._set_length
        ))
    return (item for chunk in result for item in chunk)


mpp.Pool.istarmap = istarmap

然后在你的脚本中:

import istarmap  # import to apply patch
from multiprocessing import Pool
import tqdm    


def foo(a, b):
    for _ in range(int(50e6)):
        pass
    return a, b    


if __name__ == '__main__':

    with Pool(4) as pool:
        iterable = [(i, 'x') for i in range(10)]
        for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
                           total=len(iterable)):
            pass

【讨论】:

非常好,这正是我所追求的!谢谢! 我收到AttributeError: '_PoolCache' object has no attribute '_cache' - 有什么想法吗?它发生在result = mp.IMapIterator(self._cache)@wfgeo 我使用mpp 作为模块的名称,您的示例使用mp。您是否也从答案中得到了与我的示例完全相同的错误? 是的,我刚刚替换了mppmp,这只是个人约定,抱歉。我确实使用相同的代码得到了错误,但这是因为我没有调用模块istarmap。我目前无法将它捆绑到我自己的模块中,但是,如果我将 istarmap 作为子模块放在我自己的模块中,我似乎无法弄清楚导入语句 @wfgeo 没关系,我只是不知道您是否还有其他名为 mp 的模块。恐怕这些信息不足以理解您的问题,但您需要导入 istarmap 从多处理导入其他任何内容之前。【参考方案3】:

最简单的方法可能是在输入周围应用 tqdm(),而不是映射函数。例如:

inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
    results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(inputs)))

【讨论】:

非常感谢。我认为这应该是公认的答案。我必须将输入长度作为total 传递给tqdm 才能使其工作。 你是对的,你可能需要总 arg 用于流式传输/延迟迭代 快速更新:这确实提供了进度条,但更新不像我希望的那样动态。它冻得太厉害了。 你使用了 chunksize != 1 吗?可能是从输入中分块提取元素,因此进度条不定期更新 注意:zip 对象没有长度。相反,total=len(param1) 会起作用【参考方案4】:

正如 Darkonaut 所提到的,在撰写本文时,本机没有 istarmap。如果您想避免打补丁,可以添加一个简单的 *_star 函数作为解决方法。 (这个解决方案灵感来自this tutorial.)

import tqdm
import multiprocessing

def my_function(arg1, arg2, arg3):
  return arg1 + arg2 + arg3

def my_function_star(args):
    return my_function(*args)

jobs = 4
with multiprocessing.Pool(jobs) as pool:
    args = [(i, i, i) for i in range(10000)]
    results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))

一些注意事项:

我也很喜欢 corey 的回答。它更干净,尽管进度条似乎不像我的回答那样顺利更新。请注意,使用我在上面使用chunksize=1(默认)发布的代码,corey 的答案要快几个数量级。我猜这是由于多处理序列化,因为增加chunksize(或拥有更昂贵的my_function)使它们的运行时间具有可比性。

由于我的序列化/函数成本比率非常低,因此我为我的应用程序提供了答案。

【讨论】:

这是最好的答案!您对 corey 答案的注释是正确的!

以上是关于星图与tqdm结合?的主要内容,如果未能解决你的问题,请参考以下文章

地图与星图的性能?

tqdm()与set_description()的用法

将 tqdm 与 concurrent.futures 一起使用?

一个核糖体与mRNA的结合部位,为啥形成2个tRNA结合位点

如何在 jupyter 笔记本中将 tqdm 与 pandas 一起使用?

VR)与GIS技术相结合,在哪些领域得到应用,举一个应