Dask分布式中如何高效提交大参数任务?
Posted
技术标签:
【中文标题】Dask分布式中如何高效提交大参数任务?【英文标题】:How to efficiently submit tasks with large arguments in Dask distributed? 【发布时间】:2017-05-19 04:22:53 【问题描述】:我想提交带有大(千兆字节)参数的 Dask 函数。做这个的最好方式是什么?我想用不同的(小)参数多次运行这个函数。
示例(错误)
这使用 concurrent.futures 接口。我们可以很容易地使用 dask.delayed 接口。
x = np.random.random(size=100000000) # 800MB array
params = list(range(100)) # 100 small parameters
def f(x, param):
pass
from dask.distributed import Client
c = Client()
futures = [c.submit(f, x, param) for param in params]
但这比我预期的要慢或导致内存错误。
【问题讨论】:
【参考方案1】:好的,所以这里的问题是每个任务都包含一个很大的numpy数组x
。对于我们提交的 100 个任务中的每一个,我们需要序列化 x
,将其发送到调度程序,将其发送给工作人员等。
相反,我们将数组发送到集群一次:
[future] = c.scatter([x])
现在future
是一个指向位于集群上的数组x
的令牌。现在我们可以提交引用这个远程未来的任务,而不是我们本地客户端上的 numpy 数组。
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
现在速度更快了,让 Dask 更有效地控制数据移动。
将数据分散到所有工作人员
如果您希望最终需要将数组 x 移动到所有工作人员,那么您可能需要广播数组以开始
[future] = c.scatter([x], broadcast=True)
延迟使用 Dask
期货也可以与 dask.delayed 一起正常工作。这里没有性能优势,但有些人更喜欢这个界面:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
【讨论】:
谢谢,很有用! scatter 命令返回的未来的用法(在您的示例中作为函数的参数)未在文档中解释。 在一个任务中(上面的f
)是否可以引用future
(x
的广播集群版本)而不将它作为参数传递给submit
?例如,在 spark 中,您可以广播一个变量,然后使用全局上下文在您的任务中引用它。以上是关于Dask分布式中如何高效提交大参数任务?的主要内容,如果未能解决你的问题,请参考以下文章
我们如何在 dask 分布式中为每个工作人员选择 --nthreads 和 --nprocs?