Dask:我如何将我的代码与 dask 延迟并行化?

Posted

技术标签:

【中文标题】Dask:我如何将我的代码与 dask 延迟并行化?【英文标题】:Dask: How would I parallelize my code with dask delayed? 【发布时间】:2017-07-21 21:21:46 【问题描述】:

这是我第一次尝试并行处理,我一直在研究 Dask,但我在实际编码时遇到了麻烦。

我查看了他们的示例和文档,我认为 dask.delayed 效果最好。我试图用 delay(function_name) 包装我的函数,或者添加一个 @delayed 装饰器,但我似乎无法让它正常工作。与其他方法相比,我更喜欢 Dask,因为它是用 python 制作的,而且它(假定的)简单性。我知道 dask 在 for 循环中不起作用,但他们说它可以在循环中起作用。

我的代码通过一个函数传递文件,该函数包含其他函数的输入,如下所示:

from dask import delayed
filenames = ['1.csv', '2.csv', '3.csv', etc. etc. ]
for count, name in enumerate(filenames)"
    name = name.split('.')[0]
    ....

然后做一些预处理:

    preprocess1, preprocess2 = delayed(read_files_and_do_some_stuff)(name)

然后我调用一个构造函数并将 pre_results 传递给函数调用:

    fc = FunctionCalls()
    Daily = delayed(fc.function_runs)(filename=name, stringinput='Daily',
                             input_data=pre_result1, model1=pre_result2)

我在这里所做的是将文件传递到 for 循环中,进行一些预处理,然后将文件传递到两个模型中。

关于如何实现并行化的想法或技巧?我开始遇到奇怪的错误,我不知道如何修复代码。代码按原样工作。我使用了一堆 pandas 数据帧、系列和 numpy 数组,我不希望返回并更改所有内容以使用 dask.dataframes 等。

我评论中的代码可能难以阅读。这是一种更格式化的方式。

在下面的代码中,当我输入 print(mean_squared_error) 时,我会得到:Delayed('mean_squared_error-3009ec00-7ff5-4865-8338-1fec3f9ed138')

from dask import delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = ['file1.csv']

for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = delayed(mse)(observed, prediction)

【问题讨论】:

如果您能够生成MCVE,您可能会得到更好的回复 谢谢。我已经删除了一些代码以更多地突出这个问题。如果有任何不清楚的地方,请告诉我。 理想情况下,您应该展示一个其他人可以重现的最小失败示例。您当前的问题是“我正在尝试这样的事情,但事情不起作用”。一个更好的问题可能是“我只做了这几个步骤,这些步骤足够复杂以显示问题,但也足够简单,您可以轻松复制粘贴并且无需阅读大量代码即可快速理解,而我得到的正是以下错误。” 进一步简化我的代码真的没有用。我已经进行了编辑以使其更易于理解。我试图进一步简化它,但它似乎没有解决我问题的症结所在。如果你愿意,我可以把你推荐给 github。目前,我在读入文件行中收到类型错误。 TypeError:未指定长度的延迟对象不可迭代 我想这是一个示例代码(尽管对于回答我的问题并不是一个真正有用的示例......)从 dask import delay import pandas as pd from sklearn.metrics import mean_squared_error as mse filenames = [ 'file1.csv'] for count, name in enumerate(filenames): file1 = pd.read_csv(name) df = pd.DataFrame(file1) prediction = df['Close'][:-1] # 第二个 vec 是比较观察的真实值 = df['Close'][1:] mean_squared_error = delayed(mse)(observed, prediction) 【参考方案1】:

您需要调用 dask.compute 来最终计算结果。见dask.delayed documentation。

顺序码

import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

results = []
for count, name in enumerate(filenames):
    file1 = pd.read_csv(name)
    df = pd.DataFrame(file1)  # isn't this already a dataframe?
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = mse(observed, prediction)  
    results.append(mean_squared_error)

并行代码

import dask
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

delayed_results = []
for count, name in enumerate(filenames):
    df = dask.delayed(pd.read_csv)(name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    mean_squared_error = dask.delayed(mse)(observed, prediction)
    delayed_results.append(mean_squared_error)

results = dask.compute(*delayed_results)

【讨论】:

HI @MRocklin *delayed_results 在计算调用中做了什么? @B_Miner 我认为我们正在传递数组的地址,因此它将计算所有元素发布该数组的地址,直到用完为止。 @B_Miner Dask 列出了要执行的任务。当您调用计算时,它会并行执行它们。【参考方案2】:

IMO 比公认的答案更清晰的解决方案是这个 sn-p。

from dask import compute, delayed
import pandas as pd
from sklearn.metrics import mean_squared_error as mse
filenames = [...]

def compute_mse(file_name):
    df = pd.read_csv(file_name)
    prediction = df['Close'][:-1]
    observed = df['Close'][1:]
    return mse(observed, prediction)

delayed_results = [delayed(compute_mse)(file_name) for file_name in filenames]
mean_squared_errors = compute(*delayed_results, scheduler="processes")

【讨论】:

以上是关于Dask:我如何将我的代码与 dask 延迟并行化?的主要内容,如果未能解决你的问题,请参考以下文章

python dask DataFrame,支持(可简单并行化)行吗?

如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?

与 Dask 共享内存

Dask:如何有效地分配遗传搜索算法?

在具有非唯一索引列日期的 Dask 数据框中提取最新值

来自延迟 zip csv 的 Dask 数据帧