在函数中执行循环多处理的最快方法?

Posted

技术标签:

【中文标题】在函数中执行循环多处理的最快方法?【英文标题】:Fastest way to perform Multiprocessing of a loop in a function? 【发布时间】:2020-07-31 08:42:47 【问题描述】:

1.我有一个函数var。我想知道通过利用系统拥有的所有处理器、内核、线程和 RAM 内存的多处理/并行处理,在此函数中快速运行循环的最佳方法。

import numpy
from pysheds.grid import Grid

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'

def var(interest):
    
    variable_avg = []
    for (x,y) in zip(xs,ys):
        grid = Grid.from_raster(interest, data_name='map')

        grid.catchment(data='map', x=x, y=y, out_name='catch')

        variable = grid.view('catch', nodata=np.nan)
        variable = numpy.array(variable)
        variablemean = (variable).mean()
        variable_avg.append(variablemean)
    return(variable_avg)

2. 如果我可以同时运行函数var 并针对给定的函数的多个参数在其中并行循环,那就太好了。 例如:var(a)var(b)同时。因为与单独并行循环相比,它消耗的时间要少得多。

如果没有意义,请忽略 2。

【问题讨论】:

顺便说一句,不要在内部使用函数 variable 的名称作为变量的名称。或许不要使用variable,而是使用更多描述性的词语。 您的代码甚至没有使用ab... 如果我使用var(a),代码将用r'/home/test/image1.tif' 代替grid.image(data=interest, x=x, y=y, out_name='catch') 中的interest。同样适用于参数b 以及使用r'/home/test/image2.tif' 代替兴趣。 也许 numba 会有所帮助:numba.pydata.org. @Daniser 感谢您的建议。但是,numba 在这种情况下不起作用。 【参考方案1】:

TLDR: 您可以使用多处理库并行运行 var 函数。但是,正如所写,您可能没有对var 进行足够的调用,因为它的开销导致多处理具有性能优势。如果您需要做的只是运行这两个调用,那么串行运行可能是您将获得的最快速度。但是,如果您需要进行大量调用,多处理可以帮助您。

我们需要使用进程池来并行运行它,线程不会在这里工作,因为 Python 的全局解释器锁会阻止我们实现真正的并行性。进程池的缺点是进程是重量级的。在仅对 var 运行两次调用的示例中,创建池的时间超过了运行 var 本身所花费的时间。

为了说明这一点,让我们使用进程池并使用 asyncio 并行运行对 var 的调用,并将其与仅按顺序运行的情况进行比较。请注意,运行此示例时,我使用了来自 Pysheds 库 https://github.com/mdbartos/pysheds/tree/master/data 的图像 - 如果您的图像更大,则以下可能不成立。

import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio

a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    with ProcessPoolExecutor() as pool:
        task_one = loop.run_in_executor(pool, functools.partial(var, a))
        task_two = loop.run_in_executor(pool, functools.partial(var, a))
        results = await asyncio.gather(task_one, task_two)
        pool_end = time.time()
        print(f'Process pool took pool_end-pool_start')

    serial_start = time.time()

    result_one = var(a)
    result_two = var(a)

    serial_end = time.time()
    print(f'Running in serial took serial_end - serial_start')


if __name__ == "__main__":
    asyncio.run(main())

在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述程序,我得到以下输出:

Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336

在此示例中,进程池的速度要慢五倍以上!这是由于创建和管理多个进程的开销。也就是说,如果您需要多次调用var,则进程池可能更有意义。让我们修改它以运行 var 100 次并比较结果:

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took pool_end-pool_start')

    serial_start = time.time()

    for _ in range(100):
        result = var(a)

    serial_end = time.time()
    print(f'Running in serial took serial_end - serial_start')

运行 100 次,我得到以下输出:

Process pool took 3.442288875579834
Running in serial took 13.769982099533081

在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每个迭代。您可以通过创建一个函数来执行此操作,该函数一次处理一个 x,y 坐标,然后在进程池中运行您要检查的每个点:

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took pool_end-pool_start')

    serial_start = time.time() 

在这种情况下,我得到Process pool took 3.2950568199157715 - 所以并不比我们的第一个版本快,每次调用var 一个进程。这可能是因为此时的限制因素是我们的 CPU 上有多少内核可用,将我们的工作分成更小的增量不会增加太多价值。

也就是说,如果您希望跨两个图像检查 1000 个 x 和 y 坐标,那么最后一种方法可能会带来性能提升。

【讨论】:

感谢您的详细解答。我总共有 20000 个 xy 坐标和 20 个唯一的 var 参数(如上面示例代码中的 ab)。使用此数字获得最大速度的最佳方法是什么? @Ganesh 在这种情况下,由于坐标太多,var_loop_async 中描述的方法可能表现最好。我在 20 个 var 参数中使用 1000 个 x 和 y 坐标尝试了这种方法,并且能够在大约 80 秒内运行,而不是通过在进程中运行每个 var 调用来运行 95 秒。不过,我会在您的机器上进行基准测试以确定最佳方法。 我可以知道我应该在var_loop_async(interest, pool, loop) 中的参数poolloop 上传递什么吗?而且,在for _ in range(100): 中,数字 100 将始终保持不变,还是会根据输入而变化?我可以确切地知道那里发生了什么,哪个是要调用的最终函数以及它的参数是什么,因为在我发布的原始代码中,我只有interest 可以传递给唯一的函数var(interest),但在这里我看到了两个不同的功能process_poi(), var_loop_async()。未知参数 x, y, pool, loop. @Ganesh 您应该传入pool 的进程池和loop 的异步事件循环 - 这是在答案中的main 方法中完成的。您应该能够删除 for _ in range(100) - 这只是为了演示使用 100 个 var 参数调用时的执行情况。您应该为每个 var 参数调用一次 var_loop_asyncprocess_poi 用于一次处理一个 x,y 坐标。 @MattFowler 我尝试调用函数var_loop_async(a, pool, loop),但它不起作用。系统说 poolloop 没有定义。您能否通过在其位置传递参数poolloop 来举例说明。【参考方案2】:

我认为这是一种通过仅并行化主循环来加快代码速度的合理且直接的方法。你可以用这个来饱和你的核心,所以也不需要为interest变量并行化。我无法测试代码,所以我假设你的函数是正确的,我刚刚将 loop 编码到一个新函数中,并将其并行化到 var() 中。

from multiprocessing import Pool


def var(interest,xs,ys):
    grid = Grid.from_raster(interest, data_name='map')
    with Pool(4) as p: #uses 4 cores, adjust this as you need
        variable_avg = p.starmap(loop, [(x,y,grid) for x,y in zip(xs,ys)])
    return variable_avg
    

def loop(x, y, grid):
    grid.catchment(data='map', x=x, y=y, out_name='catch')
    variable = grid.view('catch', nodata=np.nan)
    variable = numpy.array(variable)
    return variable.mean()

【讨论】:

感谢您的努力。当我运行你的代码时,我收到了一个错误NameError: nodata value for 'map' not found in instance.。不过原代码运行流畅,没有任何错误。 我不知道网格是如何工作的,你可能也必须通过map,因为你有data=map。不幸的是,从您的代码中不清楚map 的来源。你必须稍微调整一下。 不幸的是,如果你不能给我们一个最小的可重复的例子***.com/help/minimal-reproducible-example我们也帮不上什么忙。我们需要能够自己运行代码来修复它。

以上是关于在函数中执行循环多处理的最快方法?的主要内容,如果未能解决你的问题,请参考以下文章

for 循环的 5 种写法,哪种最快?

for 循环的 5 种写法,哪种最快?

钳制真实(固定/浮点)值的最快方法?

钳制真实(固定/浮点)值的最快方法?

Python多处理:向所有进程发出事件信号的最快方法?

什么是最快的?循环内部或外部的条件?