以内存有效的方式将大型 csv 读入稀疏的 pandas 数据帧
Posted
技术标签:
【中文标题】以内存有效的方式将大型 csv 读入稀疏的 pandas 数据帧【英文标题】:Read a large csv into a sparse pandas dataframe in a memory efficient way 【发布时间】:2015-10-31 13:19:33 【问题描述】:pandas read_csv
函数似乎没有稀疏选项。我有包含大量零的 csv 数据(它压缩得非常好,并且去掉任何 0
值会将其缩小到几乎是原始大小的一半)。
我尝试先使用read_csv
将其加载到密集矩阵中,然后调用to_sparse
,但这需要很长时间并且会阻塞文本字段,尽管大部分数据都是浮点数。如果我首先调用 pandas.get_dummies(df)
将分类列转换为 1 和零,然后调用 to_sparse(fill_value=0)
这将花费大量时间,这比我对包含 1200 万个条目(大部分为零)的大多数数字表的预期要长得多。即使我从原始文件中去除零并调用to_sparse()
(这样填充值为NaN),也会发生这种情况。无论我通过kind='block'
还是kind='integer'
,都会发生这种情况。
除了手动构建稀疏数据帧之外,是否有一种很好、流畅的方法可以直接加载稀疏 csv 而不会占用大量不必要的内存?
下面是一些代码,用于创建包含 3 列浮点数据和 1 列文本数据的示例数据集。大约 85% 的浮点值为零,CSV 的总大小约为 300 MB,但您可能希望将其变大以真正测试内存限制。
np.random.seed(123)
df=pd.DataFrame( np.random.randn(10000000,3) , columns=list('xyz') )
df[ df < 1.0 ] = 0.0
df['txt'] = np.random.choice( list('abcdefghij'), size=len(df) )
df.to_csv('test.csv',index=False)
这里有一个简单的阅读方式,但希望有更好、更有效的方式:
sdf = pd.read_csv( 'test.csv', dtype='txt':'category' ).to_sparse(fill_value=0.0)
Edit to Add(来自 JohnE):如果可能,请在您的答案中提供一些有关读取大型 CSV 的相关性能统计数据,包括有关您如何测量内存效率的信息(尤其是在内存效率更难的情况下)比时钟时间来衡量)。特别要注意的是,较慢的(时钟时间)答案可能是这里的最佳答案,如果内存效率更高。
【问题讨论】:
您是否尝试过将字典传递给read_csv
kwarg dtype
?我的预感是,如果您将列 dtypes 声明为 int
,这将大大加快 to_sparse
方法的性能。 Ctrl+f
'dtype' here
@user2734178 我有。不幸的是,它没有帮助。
我想最节省内存的方法是读取块,然后将每个块转换为稀疏。有点痛苦,但你永远不会在内存中拥有完整的未压缩数据集。
@JohnE 是的,恐怕我将不得不这样做。 :-/
@JohnE 如果您能编写答案,那就太好了。这听起来是最好的方法,但很多用户(包括我)不知道从哪里开始!
【参考方案1】:
这是一个主要作为基准提供的答案。希望有比这更好的方法。
chunksize = 1000000 # perhaps try some different values here?
chunks = pd.read_csv( 'test.csv', chunksize=chunksize, dtype='txt':'category' )
sdf = pd.concat( [ chunk.to_sparse(fill_value=0.0) for chunk in chunks ] )
正如@acushner 所说,您可以改为将其作为生成器表达式:
sdf = pd.concat( chunk.to_sparse(fill_value=0.0) for chunk in chunks )
似乎一致认为这比列表比较方式更好,尽管在我的测试中我没有看到任何大的差异,但也许你可能会使用不同的数据。
我希望报告一些关于各种方法的内存分析,但难以获得一致的结果,我怀疑是因为 python 总是在后台清理内存,导致结果中添加了一些随机噪声。 (在对 Jake 回答的评论中,他建议在每个 %memit
之前重新启动 jupyter 内核以获得更一致的结果,但我还没有尝试过。)
但我确实一直发现(使用%%memit
)上面的分块读取和@jakevdp 的 dask 方法都使用了大约一半内存的东西,作为 OP 中的幼稚方法。有关 profiling 的更多信息,您应该查看 Jake 的《Python 数据科学手册》一书中的“Profiling and Timing Code”。
【讨论】:
仅供参考,您可以创建一个空的DataFrame
,然后附加到它,这样您就不必同时在内存中拥有所有块。
@JohnMoeller 这不是一个好主意。每次附加到数据帧时,它都必须重新分配整个数据集以使其连续。另一方面,在 concat
调用中使用 gen expr 而不是 list comp。
明白了,我的意思是,从概念上和 Python 上来说,在这里使用 gen expr 更有意义。如果您使用列表组合,您将创建一个列表,然后立即将其丢弃。另外,gen expr 更干净。您的回答很好,我没有理由添加我的一个,出于上述原因,我只会使用 gen expr。
@johnmoeller 另一条信息:如果 gen expr 是函数调用的唯一参数,则不需要额外的一组括号(例如sum(i for i in range(10))
)【参考方案2】:
我可能会通过使用dask 以流式方式加载您的数据来解决这个问题。例如,您可以按如下方式创建一个 dask 数据框:
import dask.dataframe as ddf
data = ddf.read_csv('test.csv')
这个data
对象此时实际上还没有做任何事情;它只包含一个“配方”,可以以可管理的块从磁盘读取数据帧。如果要物化数据,可以拨打compute()
:
df = data.compute().reset_index(drop=True)
此时,您有一个标准的 pandas 数据框(我们称其为 reset_index
,因为默认情况下每个分区都是独立索引的)。结果相当于直接调用pd.read_csv
得到的结果:
df.equals(pd.read_csv('test.csv'))
# True
dask 的好处是您可以在此“配方”中添加说明以构建您的数据框;例如,您可以按如下方式使数据的每个分区变得稀疏:
data = data.map_partitions(lambda part: part.to_sparse(fill_value=0))
此时调用compute()
会构造一个稀疏数组:
df = data.compute().reset_index(drop=True)
type(df)
# pandas.core.sparse.frame.SparseDataFrame
分析
要检查 dask 方法与 raw pandas 方法的比较,让我们进行一些线路分析。我将使用lprun
和mprun
,如here 所述(完全披露:这是我自己书中的一部分)。
假设你在 Jupyter notebook 中工作,你可以这样运行它:
首先,创建一个包含我们要执行的基本任务的单独文件:
%%file dask_load.py
import numpy as np
import pandas as pd
import dask.dataframe as ddf
def compare_loads():
df = pd.read_csv('test.csv')
df_sparse = df.to_sparse(fill_value=0)
df_dask = ddf.read_csv('test.csv', blocksize=10E6)
df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
df_dask = df_dask.compute().reset_index(drop=True)
接下来让我们逐行分析计算时间:
%load_ext line_profiler
from dask_load import compare_loads
%lprun -f compare_loads compare_loads()
我得到以下结果:
Timer unit: 1e-06 s
Total time: 13.9061 s
File: /Users/jakevdp/dask_load.py
Function: compare_loads at line 6
Line # Hits Time Per Hit % Time Line Contents
==============================================================
6 def compare_loads():
7 1 4746788 4746788.0 34.1 df = pd.read_csv('test.csv')
8 1 769303 769303.0 5.5 df_sparse = df.to_sparse(fill_value=0)
9
10 1 33992 33992.0 0.2 df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 1 7848 7848.0 0.1 df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 1 8348217 8348217.0 60.0 df_dask = df_dask.compute().reset_index(drop=True)
对于上面的示例数组,我们看到大约 60% 的时间用于 dask 调用,而大约 40% 的时间用于 pandas 调用。这告诉我们,对于这个任务,dask 比 pandas 慢大约 50%:这是意料之中的,因为数据分区的分块和重组会导致一些额外的开销。
dask 的亮点在于内存使用情况:让我们使用mprun
进行逐行内存配置:
%load_ext memory_profiler
%mprun -f compare_loads compare_loads()
我机器上的结果是这样的:
Filename: /Users/jakevdp/dask_load.py
Line # Mem usage Increment Line Contents
================================================
6 70.9 MiB 70.9 MiB def compare_loads():
7 691.5 MiB 620.6 MiB df = pd.read_csv('test.csv')
8 828.8 MiB 137.3 MiB df_sparse = df.to_sparse(fill_value=0)
9
10 806.3 MiB -22.5 MiB df_dask = ddf.read_csv('test.csv', blocksize=10E6)
11 806.4 MiB 0.1 MiB df_dask = df_dask.map_partitions(lambda part: part.to_sparse(fill_value=0))
12 947.9 MiB 141.5 MiB df_dask = df_dask.compute().reset_index(drop=True)
我们看到最终的 pandas 数据帧大小约为 ~140MB,但 pandas 在将数据读入临时密集对象时使用了 ~620MB。
另一方面,dask 在加载数组和构建最终稀疏结果时仅使用了大约 140MB。如果您正在读取其密集大小与系统上可用内存相当的数据,dask 具有明显的优势,尽管计算时间慢了约 50%。
但对于处理大数据,您不应止步于此。大概您正在对数据进行一些操作,而 dask 数据框抽象允许您在实现数据之前执行这些操作(即将它们添加到“配方”)。因此,如果您对数据所做的操作涉及算术、聚合、分组等,您甚至无需担心稀疏存储:只需对 dask 对象执行这些操作,最后调用 compute()
,然后dask 会以一种内存有效的方式应用它们。
因此,例如,我可以使用 dask 数据帧计算每列的 max()
,而不必一次将整个内容加载到内存中:
>>> data.max().compute()
x 5.38114
y 5.33796
z 5.25661
txt j
dtype: object
直接使用 dask 数据帧可以让您不必担心数据表示,因为您可能永远不必一次将所有数据加载到内存中。
祝你好运!
【讨论】:
非常感谢您的回答!我应该注意到,我发现很难在内存时序上获得一致的结果。我想回到你书中的相关部分(数据科学手册中的“分析和计时代码”)——免费插件;-) 并且主要使用 %memit 但同样,无法获得真正一致的结果(我'将在我的回答中详细说明) 笔记本中的 memit 有点麻烦——你需要在再次分析相同的函数之前重新启动内核,否则你会得到奇怪的结果。 好的,如果这很重要,我实际上是在 qt 控制台中做的。当我有足够的时间重置内核等时,我会尝试解决这个问题。 如何遍历 dask 数据框中的行? to_sparse 似乎不再是一回事了以上是关于以内存有效的方式将大型 csv 读入稀疏的 pandas 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
根据标准从大型数据集中读取特定数据,以避免将整个文件读入内存