为啥来自 s3 的 dask read_csv 保留了这么多内存?
Posted
技术标签:
【中文标题】为啥来自 s3 的 dask read_csv 保留了这么多内存?【英文标题】:Why is dask read_csv from s3 keeping so much memory?为什么来自 s3 的 dask read_csv 保留了这么多内存? 【发布时间】:2018-08-03 21:03:36 【问题描述】:我正在使用 dask(替代 SQL 查询)从 s3 中读取一些压缩后的数据。但是,看起来有一些数据文件的缓存,或保存在系统内存中的解压缩文件。注意,这应该是可运行的,这里的测试数据来自公共 s3 存储桶中的 pandas 测试套件。
import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os
#for easier vis
mb = 1048576
def mytestfunc(file):
process = ps.Process(os.getpid())
print('initial memory: 0'.format(process.memory_info().rss/mb))
data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = 'anon':True)
print('dask plan memory: 0'.format(process.memory_info().rss/mb))
data = data.compute()
print('data in memory: 0'.format(process.memory_info().rss/mb))
print('data frame usage: 0'.format(data.memory_usage(deep=True).sum()/mb))
return data
process = ps.Process(os.getpid())
print('before function call: 0'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: 0'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function call: 0'.format(process.memory_info().rss/mb))
这给了我:
before function call: 76.984375
initial memory: 76.984375
dask plan memory: 92.9921875
data in memory: 224.71484375
data frame usage: 38.14704895019531
After function call: 224.7265625
天真地,我希望“函数调用之后”是“函数调用之前”加上数据帧和一些开销。在这里,gzip 是 43mb,导致大约 90mb 的开销,在我的实际示例中,这个额外的部分是 10gb 数据帧的大约 50gb 额外内存。
如果您在另一个较小的文件上重新运行,您可以看到内存已释放 - 取消对较小文件的重新运行的注释以查看它。这也表明增加是由于文件大小 - 您可以切换顺序并先运行“提示”,内存保持在 ~90mb。
我猜 dask、s3fs 或 pandas 将文件或解压缩的内容保存在某处的缓冲区中,但我无法追踪它以清除它。
关于如何减少这种内存使用或释放缓冲区的任何想法?
编辑:我的一些真实数据的上述输出示例 - 32 个 gzip 压缩文件:
before function call: 70.69921875
initial memory: 70.69921875
dask plan memory: 80.16015625
data in memory: 33991.69921875
data frame usage: 10824.553115844727
After function call: 33991.69921875
我知道 dask 在相同的 32 个文件上的峰值内存使用率比 pandas 循环更高,但我仍然不明白为什么它没有被释放。
【问题讨论】:
在使这个可重现方面做得很好 - 似乎是特定于 dask 的,如果我直接从 pandas(它也使用 s3fs)读取,内存模式会按照你的想法进行 很好 - 我使用的是 dask 而不是 pandas,因为我有一个包含 32 个文件的数据库卸载,而不仅仅是 1。现在添加赏金! 示例中out
的大小为38mb,但函数调用后的内存使用量增加了~130mb,看起来好像某处存在未清除的缓存或引用。在我的实际数据中,对于 10gb 的数据,这几乎是 30gb。 Pandas 本身不做同样的事情,只是根据数据大小增加内存使用量
我可以确认这种行为在本地 .gz 文件中可以看到,并且在同一个文件未压缩后也可以看到(尽管后一种情况下的内存增加较小)。
如果您尝试使用单线程调度程序dask.set_globals(get=dask.local.get_sync)
,那么问题就会消失。我怀疑dask.threaded.default_pool
的默认线程池中存在一些问题。可以通过在没有 Dask 的情况下使用 ThreadPool.get_async
来隔离问题并查看问题是否仍然存在
【参考方案1】:
在线程中使用pandas.read_csv
时,Python 进程似乎泄漏了一点内存。我已将其简化为pandas.read_csv
和concurrent.futures.ThreadPoolExecutor
的问题。这是在 Pandas 问题跟踪器上提出的:https://github.com/pandas-dev/pandas/issues/19941
# imports
import pandas as pd
import numpy as np
import time
import psutil
from concurrent.futures import ThreadPoolExecutor
# prep
process = psutil.Process()
e = ThreadPoolExecutor(8)
# prepare csv file, only need to run once
pd.DataFrame(np.random.random((100000, 50))).to_csv('large_random.csv')
# baseline computation making pandas dataframes with threasds. This works fine
def f(_):
return pd.DataFrame(np.random.random((1000000, 50)))
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(f, range(8)))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')
# before: 57.0 MB
# after: 56.0 MB
# example with read_csv, this leaks memory
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(pd.read_csv, ['large_random.csv'] * 8))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')
# before: 58.0 MB
# after: 323.0 MB
【讨论】:
以上是关于为啥来自 s3 的 dask read_csv 保留了这么多内存?的主要内容,如果未能解决你的问题,请参考以下文章