Dask分布式中如何高效提交大参数任务?

Posted

技术标签:

【中文标题】Dask分布式中如何高效提交大参数任务?【英文标题】:How to efficiently submit tasks with large arguments in Dask distributed? 【发布时间】:2017-05-19 04:22:53 【问题描述】:

我想提交带有大(千兆字节)参数的 Dask 函数。做这个的最好方式是什么?我想用不同的(小)参数多次运行这个函数。

示例(错误)

这使用 concurrent.futures 接口。我们可以很容易地使用 dask.delayed 接口。

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

但这比我预期的要慢或导致内存错误。

【问题讨论】:

【参考方案1】:

好的,所以这里的问题是每个任务都包含一个很大的numpy数组x。对于我们提交的 100 个任务中的每一个,我们需要序列化 ​​x,将其发送到调度程序,将其发送给工作人员等。

相反,我们将数组发送到集群一次:

[future] = c.scatter([x])

现在future 是一个指向位于集群上的数组x 的令牌。现在我们可以提交引用这个远程未来的任务,而不是我们本地客户端上的 numpy 数组。

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

现在速度更快了,让 Dask 更有效地控制数据移动。

将数据分散到所有工作人员

如果您希望最终需要将数组 x 移动到所有工作人员,那么您可能需要广播数组以开始

[future] = c.scatter([x], broadcast=True)

延迟使用 Dask

期货也可以与 dask.delayed 一起正常工作。这里没有性能优势,但有些人更喜欢这个界面:

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)

【讨论】:

谢谢,很有用! scatter 命令返回的未来的用法(在您的示例中作为函数的参数)未在文档中解释。 在一个任务中(上面的f)是否可以引用futurex 的广播集群版本)而不将它作为参数传递给submit?例如,在 spark 中,您可以广播一个变量,然后使用全局上下文在您的任务中引用它。

以上是关于Dask分布式中如何高效提交大参数任务?的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:用Dask扩展

我们如何在 dask 分布式中为每个工作人员选择 --nthreads 和 --nprocs?

如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?

如何在 dask 分布式工作人员上设置日志记录?

Dask Worker 进程内存不断增长

dask分布式数据帧上的慢len函数