Dask 数据帧大于内存
Posted
技术标签:
【中文标题】Dask 数据帧大于内存【英文标题】:Dask dataframe larger than memory 【发布时间】:2020-10-16 11:39:54 【问题描述】:我是 Dask 的新手,我发现它非常有用,但我有一个问题我还没有解决。
我有一个大于内存的数据集,我想从列中删除重复值。
问题在于,删除后数据集仍将大于内存。因此,需要通过文件计算结果,直接保存到磁盘中。
当然,我可以构建一个代码来手动执行此删除操作,但我想知道 Dask 是否已经实现了此功能。
这是我的代码:
from dask.distributed import Client
import dask.dataframe as dd
client = Client(memory_limit='8GB') # I've tried without this limit
data = dd.read_csv("path_to_file", dtype=
'id': 'Int64'
, sample=1000)
data.drop_duplicates(subset=['text'])
results = data.compute() # <- Here is the problem
results.to_csv("pathout", index=False)
当我调用计算时,结果是一个 DataFrame pandas,在这种情况下,它比内存大。我收到了很多:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
然后执行失败并显示“KilledWorker”
编辑:
自包含示例:
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
# Creates about 2Gb of data
data = np.random.randint(0, 10000, size=(2000000,200))
pd.DataFrame(data).to_csv('test_in.csv', index=False)
# If you want to run on terminal, uncomment the next line and identy the rest of the code
# if __name__ == '__main__':
# To test, limit dask to 1Gb
client = Client(n_workers=1, memory_limit='1GB')
df = dd.read_csv('test_in.csv', blocksize='16MB')
results = df.drop_duplicates()
results.to_csv('test_out.csv', index=False)
client.close()
【问题讨论】:
您使用的是Jupyter lab
/notebook
还是Google Colab
?因为在Google Colab
中,您可以免费获得大约 12 GB 的 RAM,所以可能会在 "KilledWorker"
时执行不会失败
【参考方案1】:
from dask.distributed import Client
import dask.dataframe as dd
client = Client(memory_limit='8GB')
data = dd.read_csv("path_to_file", dtype='id': 'Int64', sample=1000)
results = data.drop_duplicates(subset=['A']) # Don't call compute here
results.to_csv("pathout", index=False) # Write operations automatically call compute
.compute() 将返回一个 Pandas 数据框,然后 Dask 就消失了。您可以使用 Dask 中的 .to_csv() 函数,它会为每个分区保存一个文件。
只需删除 .compute() ,如果每个分区都适合内存,它将起作用。
哦,你需要分配 .drop_duplicates() 的结果。
【讨论】:
我尝试了这个更改,分配结果并删除了 compute()。 results = data.drop_duplicates(subset=['text']) results.to_csv("pathout", index=False) 查看客户端的仪表板,我认为一切正常,因为出现了 read_csv、drop_duplicated 和 _write_csv 的任务。但是这个过程仍然以 KilledWorker 结束 一个分区/块大于内存。降低 read_csv 函数中分区的大小。在 Dask 文档中查找。同时降低 memory_limit 我在read_csv中尝试了不同的blocksize值,32MB、16MB、8MB、1MB,错误总是一样的。但我注意到了别的东西。查看执行图,有一些级别的任务,首先是读取数据,然后是 drop-duplicates-chunk,然后是 drop-duplicates-combine。 drop-duplicates-combine比较大,好像是把part的所有数据都累积起来了。可能是这些组合超出了内存限制。 你能把执行图贴在这里吗?尝试更像 1GB 的值。你也降低了内存限制吗?此外,您可能希望使用 Dask 性能报告来更好地显示正在发生的事情 (distributed.dask.org/en/latest/…)。如果您真的希望我/我们帮助您,请编写一个独立的最小工作示例 (matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports)。例如,您可以使用随机数据创建一个 csv 文件。 我在答案中添加了一个代码示例。你能尝试执行吗?【参考方案2】:我认为你的工人被杀了,因为 drop_duplicates 将 df.npartitions 重置为 1。 尝试在前后打印 df.npartitions 以确保。
您可以尝试的一件事是 results = df.drop_duplicates(split_out=df.npartitions) 这仍然需要很长时间才能计算困难..
【讨论】:
我不这么认为。正如你所说,我包括了 df.npartitions 打印,但是这个数字在执行过程中没有改变。即使在 KilledWorker 之后,它也没有改变。我还尝试在 drop_duplicates 中包含 split_out 参数,但它不起作用。错误是一样的。以上是关于Dask 数据帧大于内存的主要内容,如果未能解决你的问题,请参考以下文章