与 Dask 共享内存
Posted
技术标签:
【中文标题】与 Dask 共享内存【英文标题】:Shared Memory with Dask 【发布时间】:2021-05-26 12:17:05 【问题描述】:我通过 Dask 的星图并行化一个 CPU 密集型任务,该星图将一个大型嵌套列表和一个稀疏矩阵作为只读输入。为了避免嵌套列表和稀疏矩阵被重复复制到进程中,我想让这两个对象都可以通过共享内存访问。 Dask 是否提供 Dask 原生解决方案?
我正在使用 Python 3.8.5(默认,2020 年 9 月 3 日,21:29:08)[MSC v.1916 64 位 (AMD64)] 和 Spyder 4.2.1 在 64 位 Windows 机器上工作。作为一种变通解决方案,我已经使用多处理库中的共享内存进行了测试,它不适用于嵌套列表和稀疏矩阵。
在下文中,我将提供一个最小示例来展示我的用例:
import dask.bag as db
from dask.distributed import Client
from scipy.sparse import lil_matrix
from time import sleep
# Create a nested list
nested_list = [list(range(2)), list(range(3))]
# Create a sparse matrix
sparse_matrix = lil_matrix((1000, 1000))
sparse_matrix[0, :100] = range(100)
sparse_matrix[1, 100:200] = sparse_matrix[0, :100]
def toy_function(x, y, z, nested_list, sparse_matrix):
# After some time compute a sum.
sleep(10)
result = x + y + z + sparse_matrix[0, 0] + nested_list[0][0]
return result
data = [(1, 2, 3), (3, 4, 5), (5, 6, 7), (7, 8, 9), (9, 10, 11)]
b = db.from_sequence(data)
z = b.starmap(toy_function, nested_list = nested_list, sparse_matrix = sparse_matrix).compute()
# From the Dask Dashboard I conclude that nested_list and sparse_matrix are copied into each process.
【问题讨论】:
【参考方案1】:一种选择是使用client.scatter
预先分发这些对象(根据您的使用情况,您可能不希望在每个工作人员上都有一个副本,在这种情况下您可以省略broadcast=True
选项):
import dask.bag as db
import dask
from dask.distributed import Client
from scipy.sparse import lil_matrix
from time import sleep
client = Client()
# Create a nested list
nested_list = [list(range(2)), list(range(3))]
nested_list = client.scatter(nested_list, broadcast=True)
# Create a sparse matrix
sparse_matrix = lil_matrix((1000, 1000))
sparse_matrix[0, :100] = range(100)
sparse_matrix[1, 100:200] = sparse_matrix[0, :100]
sparse_matrix = client.scatter(sparse_matrix, broadcast=True)
def toy_function(x, y, z, **kwargs):
nested_list, sparse_matrix = kwargs['nested_list'], kwargs['sparse_matrix']
# After some time compute a sum.
sleep(1)
result = x + y + z + sparse_matrix[0, 0] + nested_list[0][0]
return result
data = [(1, 2, 3), (3, 4, 5), (5, 6, 7), (7, 8, 9), (9, 10, 11)]
b = db.from_sequence(data)
z = b.starmap(toy_function, nested_list = nested_list, sparse_matrix = sparse_matrix).compute()
print(z)
或者,您也可以将这些对象转换为delayed
,因此只传递对延迟对象的引用。这意味着您将拥有 dask.delayed
包装器,而不是 client.scatter
:
nested_list = dask.delayed(nested_list)
sparse_matrix = dask.delayed(sparse_matrix)
【讨论】:
我将 client.scatter() 解决方案设置broadcast
测试为其默认值 False
。但是,检查任务管理器和类似的 Dask 仪表板,我得出结论,内存不是共享的。与dask.delayed
包装器类似,我观察到大型嵌套列表被复制到每个进程中。正如所提供的代码 sn-p 中所建议的,我实现了这两种解决方案。
嗯,很有趣。在这两个 sn-ps 中,传递给函数的是未来值或延迟值(两者都是轻量级的,不依赖于底层对象的大小)......以上是关于与 Dask 共享内存的主要内容,如果未能解决你的问题,请参考以下文章