dask分布式数据帧上的慢len函数
Posted
技术标签:
【中文标题】dask分布式数据帧上的慢len函数【英文标题】:Slow len function on dask distributed dataframe 【发布时间】:2017-06-13 14:42:40 【问题描述】:我一直在测试如何使用 dask(20 核集群),我对调用 len 函数与通过 loc 切片的速度感到惊讶。
import dask.dataframe as dd
from dask.distributed import Client
client = Client('192.168.1.220:8786')
log = pd.read_csv('800000test', sep='\t')
logd = dd.from_pandas(log,npartitions=20)
#This is the code than runs slowly
#(2.9 seconds whilst I would expect no more than a few hundred millisencods)
print(len(logd))
#Instead this code is actually running almost 20 times faster than pandas
logd.loc[:'Host'].count().compute()
任何想法为什么会发生这种情况? len 跑得快对我来说并不重要,但我觉得由于不理解这种行为,我无法理解库。
所有绿色框都对应于“from_pandas”,而在 Matthew Rocklin http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes 撰写的这篇文章中,调用图看起来更好(调用 len_chunk 明显更快,并且调用似乎没有被锁定并等待一个工人在开始另一个工人之前完成他的任务)
【问题讨论】:
【参考方案1】:好问题,这涉及到数据何时向上移动到集群并返回到客户端(您的 python 会话)。让我们看看您的计算的几个阶段
使用 Pandas 加载数据
这是您的 python 会话中的 Pandas 数据框,因此它显然仍在您的本地进程中。
log = pd.read_csv('800000test', sep='\t') # on client
转换为惰性 Dask.dataframe
这会将您的 Pandas 数据帧分解为 20 个 Pandas 数据帧,但这些数据帧仍在客户端上。 Dask 数据帧不会急切地将数据发送到集群。
logd = dd.from_pandas(log,npartitions=20) # still on client
计算长度
调用len
实际上会在这里引起计算(通常你会使用df.some_aggregation().compute()
。所以现在 Dask 开始了。首先它将你的数据移出集群(慢)然后它在所有 20 个分区上调用 len(快),它会聚合这些(快速),然后将结果下移到您的客户端以便打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
分析
所以这里的问题是我们的 dask.dataframe 在本地 python 会话中仍然有它的所有数据。
使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads
print(len(logd)) # stays on client
但大概您想知道如何扩展到更大的数据集,所以让我们以正确的方式来做吧。
将您的数据加载到工作人员身上
不要在客户端/本地会话上加载 Pandas,而是让 Dask 工作人员加载 csv 文件的位。这样就不需要客户端与工作人员之间的通信了。
# log = pd.read_csv('800000test', sep='\t') # on client
log = dd.read_csv('800000test', sep='\t') # on cluster workers
然而,与pd.read_csv
不同,dd.read_csv
是惰性的,所以它应该几乎立即返回。我们可以强制 Dask 使用 persist 方法实际进行计算
log = client.persist(log) # triggers computation asynchronously
现在集群开始运行并将您的数据直接加载到工作器中。这是相对较快的。请注意,当工作在后台进行时,此方法会立即返回。如果您想等到它完成,请致电wait
。
from dask.distributed import wait
wait(log) # blocks until read is done
如果您正在使用小型数据集进行测试并希望获得更多分区,请尝试更改块大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,log
上的操作现在应该很快
len(log) # fast
编辑
针对this blogpost 上的问题,以下是我们对文件所在位置所做的假设。
通常,当您向dd.read_csv
提供文件名时,它假定所有工作人员都可以看到该文件。如果您使用的是网络文件系统,或者像 S3 或 HDFS 这样的全局存储,这是正确的。如果您使用的是网络文件系统,那么您将需要使用绝对路径(例如 /path/to/myfile.*.csv
),或者确保您的工作人员和客户端具有相同的工作目录。
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散它。
简单但次优
简单的方法就是做你最初做的事情,但坚持你的 dask.dataframe
log = pd.read_csv('800000test', sep='\t') # on client
logd = dd.from_pandas(log,npartitions=20) # still on client
logd = client.persist(logd) # moves to workers
这很好,但会导致沟通不太理想。
复杂但最佳
相反,您可以将数据显式分散到集群中
[future] = client.scatter([log])
不过,这涉及到更复杂的 API,所以我只会向您指出文档
http://distributed.readthedocs.io/en/latest/manage-computation.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/delayed-collections.html
【讨论】:
我刚遇到这个(len 触发计算)。我试图在计算任何东西之前构建我们的大部分计算图 - 有没有办法获得数据帧长度的延迟结果?series.isnull().sum() + series.notnull().sum()
是一种看似有效的 hack,但它所做的工作超出了必要的范围。
series.size()
以上是关于dask分布式数据帧上的慢len函数的主要内容,如果未能解决你的问题,请参考以下文章
深度学习核心技术精讲100篇(八十五)-Dask 分布高性能计算深入讲解