dask分布式数据帧上的慢len函数

Posted

技术标签:

【中文标题】dask分布式数据帧上的慢len函数【英文标题】:Slow len function on dask distributed dataframe 【发布时间】:2017-06-13 14:42:40 【问题描述】:

我一直在测试如何使用 dask(20 核集群),我对调用 len 函数与通过 loc 切片的速度感到惊讶。

import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')

log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)

#This is the code than runs slowly 
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)

print(len(logd))

#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()

任何想法为什么会发生这种情况? len 跑得快对我来说并不重要,但我觉得由于不理解这种行为,我无法理解库。

所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes 撰写的这篇文章中,调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工人在开始另一个工人之前完成他的任务)

【问题讨论】:

【参考方案1】:

好问题,这涉及到数据何时向上移动到集群并返回到客户端(您的 python 会话)。让我们看看您的计算的几个阶段

使用 Pandas 加载数据

这是您的 python 会话中的 Pandas 数据框,因此它显然仍在您的本地进程中。

log = pd.read_csv('800000test', sep='\t')  # on client

转换为惰性 Dask.dataframe

这会将您的 Pandas 数据帧分解为 20 个 Pandas 数据帧,但这些数据帧仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算长度

调用len 实际上会在这里引起计算(通常你会使用df.some_aggregation().compute()。所以现在 Dask 开始了。首先它将你的数据移出集群(慢)然后它在所有 20 个分区上调用 len(快),它会聚合这些(快速),然后将结果下移到您的客户端以便打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的 dask.dataframe 在本地 python 会话中仍然有它的所有数据。

使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads
    print(len(logd))  # stays on client

但大概您想知道如何扩展到更大的数据集,所以让我们以正确的方式来做吧。

将您的数据加载到工作人员身上

不要在客户端/本地会话上加载 Pandas,而是让 Dask 工作人员加载 csv 文件的位。这样就不需要客户端与工作人员之间的通信了。

# log = pd.read_csv('800000test', sep='\t')  # on client
log = dd.read_csv('800000test', sep='\t')    # on cluster workers

然而,与pd.read_csv 不同,dd.read_csv 是惰性的,所以它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算

log = client.persist(log)  # triggers computation asynchronously

现在集群开始运行并将您的数据直接加载到工作器中。这是相对较快的。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请致电wait

from dask.distributed import wait
wait(log)  # blocks until read is done

如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改块大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,log 上的操作现在应该很快

len(log)  # fast

编辑

针对this blogpost 上的问题,以下是我们对文件所在位置所做的假设。

通常,当您向dd.read_csv 提供文件名时,它假定所有工作人员都可以看到该文件。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(例如 /path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录。

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。

简单但次优

简单的方法就是做你最初做的事情,但坚持你的 dask.dataframe

log = pd.read_csv('800000test', sep='\t')  # on client
logd = dd.from_pandas(log,npartitions=20)  # still on client
logd = client.persist(logd)  # moves to workers

这很好,但会导致沟通不太理想。

复杂但最佳

相反,您可以将数据显式分散到集群中

[future] = client.scatter([log])

不过,这涉及到更复杂的 API,所以我只会向您指出文档

http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html

【讨论】:

我刚遇到这个(len 触发计算)。我试图在计算任何东西之前构建我们的大部分计算图 - 有没有办法获得数据帧长度的延迟结果? series.isnull().sum() + series.notnull().sum() 是一种看似有效的 hack,但它所做的工作超出了必要的范围。 series.size()

以上是关于dask分布式数据帧上的慢len函数的主要内容,如果未能解决你的问题,请参考以下文章

深度学习核心技术精讲100篇(八十五)-Dask 分布高性能计算深入讲解

如何使用分布式 Dask 和预训练的 Keras 模型进行模型预测?

同一数据帧上的多个总和

python Dask分布式安装

Airflow 中文文档:用Dask扩展

如何在 dask 分布式工作人员上设置日志记录?