如何使用dask.distributed并行化嵌套循环?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用dask.distributed并行化嵌套循环?相关的知识,希望对你有一定的参考价值。

我试图使用看起来像这样的dask分布并行化嵌套循环:

@dask.delayed
def delayed_a(e):
    a = do_something_with(e)
    return something

@dask.delayed
def delayed_b(element):
    computations = []
    for e in element:
        computations.add(delayed_a(e))

    b = dask.compute(*computations, scheduler='distributed',
                    num_workers=4)
    return b

list = [some thousands of elements here]
computations = []
for element in list:
    computations.append(delayed_b(element))
    results = dask.compute(*computations, scheduler='distributed',
                           num_workers=4)

如你所见,我正在使用distributed调度程序。首先,我创建一个computations列表,其中包含一个懒惰的delayed_b函数,该函数将list中的一个元素作为参数。然后,delayed_b创建了一组新的computations,它们调用delayed_a函数,所有内容都以分布式执行。这个伪代码正在运行,但我发现如果delayed_a不在那里会更快。那么我的问题是 - 做循环分布式并行的正确方法是什么?

在历史的最后,我想要做的是:

list = [some thousands of elements here]
for element in list:
    for e in element:
        do_something_with(e)

我真的很感激有关使用dask.distributed完成嵌套循环的最佳方法的任何建议。

答案

简单:

something = dask.delayed(do_something_with_e
list = [some thousands of elements here]

# this could be written as a one-line comprehension
computations = []
for element in list:
    part = []
    computations.append(part)
    for e in element:
        part.append(something(e))

results = dask.compute(*computations, scheduler='distributed',
                       num_workers=4)

你永远不应该在延迟函数中调用延迟函数或compute()

(请注意,只要您创建了客户端,默认情况下就会使用分布式调度程序)

以上是关于如何使用dask.distributed并行化嵌套循环?的主要内容,如果未能解决你的问题,请参考以下文章

在dask.distributed群集中的计算机之间共享python模块

如何优化并行嵌套循环?

在 OpenMP 中并行化嵌套循环并使用更多线程执行内部循环

如何使用CUDA并行化嵌套for循环以在2D数组上执行计算

Dask 中的 KilledWorker 异常是啥意思?

CUDA:并行化具有嵌套循环的函数调用的多个嵌套for循环