在python中读取15 M行csv文件的有效方法

Posted

技术标签:

【中文标题】在python中读取15 M行csv文件的有效方法【英文标题】:Efficient way to read 15 M lines csv files in python 【发布时间】:2019-11-12 05:07:20 【问题描述】:

对于我的应用程序,我需要读取多个文件,每个文件有 15 M 行,将它们存储在 DataFrame 中,并将 DataFrame 保存为 HDFS5 格式。

我已经尝试过不同的方法,特别是具有 chunksize 和 dtype 规范的 pandas.read_csv 和 dask.dataframe。他们都需要大约 90 秒来处理 1 个文件,所以我想知道是否有办法以所述方式有效地处理这些文件。下面,我展示了一些我已经完成的测试的代码。

import pandas as pd
import dask.dataframe as dd
import numpy as np
import re 

# First approach
store = pd.HDFStore('files_DFs.h5')

chunk_size = 1e6

df_chunk = pd.read_csv(file,
                sep="\t",
                chunksize=chunk_size,
                usecols=['a', 'b'],
                converters="a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                            "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                skiprows=15
           )              
chunk_list = [] 


for chunk in df_chunk:
      chunk_list.append(chunk)


df = pd.concat(chunk_list, ignore_index=True)

store[dfname] = df
store.close()

# Second approach

df = dd.read_csv(
        file,
        sep="\t",
        usecols=['a', 'b'],
        converters="a": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),\
                    "b": lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
        skiprows=15
     )
store.put(dfname, df.compute())
store.close()

这是文件的样子(空格由文字制表符组成):

a   b
599.998413  14.142895
599.998413  20.105534
599.998413  6.553850
599.998474  27.116098
599.998474  13.060312
599.998474  13.766775
599.998596  1.826706
599.998596  18.275938
599.998718  20.797491
599.998718  6.132450)
599.998718  41.646194
599.998779  19.145775

【问题讨论】:

提供样本数据 请告诉我这是否有效:ifmafr0-my.sharepoint.com/:x:/g/personal/… 我收到了size is too big (>30 MB) 错误。您可以在问题正文中添加 5-10 行。 为什么要使用 read_csv 方法读取 XML?如果这段代码有效(它不在我的计算机上),只需删除这些正则表达式并在之前编译它们(或者更好,使用 str.replace 代替) 其实我读的是.txt文件。我只是以这种格式插入了一些值作为示例。使用正则表达式是因为文件可能包含一些值,如“10.042)”,所以我不想阅读“)”。 【参考方案1】:

好吧,我的发现与 pandas 没有太大关系,而是一些常见的陷阱。

Your code: 
(genel_deneme) ➜  derp time python a.py
python a.py  38.62s user 0.69s system 100% cpu 39.008 total
    预编译你的正则表达式
Replace re.sub(r"[^\d.]", "", x) with precompiled version and use it in your lambdas
Result : 
(genel_deneme) ➜  derp time python a.py 
python a.py  26.42s user 0.69s system 100% cpu 26.843 total
    尝试找到一种更好的方法,然后直接使用 np.float32,因为它比我想象的要慢 6-10 倍。以下不是您想要的,但我只想在这里展示这个问题。
replace np.float32 with float and run your code. 
My Result:  
(genel_deneme) ➜  derp time python a.py
python a.py  14.79s user 0.60s system 102% cpu 15.066 total

找到另一种使用浮点数实现结果的方法。 更多关于这个问题https://***.com/a/6053175/37491

    如果可以,将您的文件和工作分配给子流程。您已经处理了恒定大小的单独块。因此,基本上您可以使用多处理或线程在单独的进程中划分文件并处理作业。

【讨论】:

【参考方案2】:

首先,让我们回答问题的标题

1- 如何有效读取包含浮点数的 csv 的 15M 行

我建议你使用modin:

生成样本数据:

import modin.pandas as mpd
import pandas as pd
import numpy as np

frame_data = np.random.randint(0, 10_000_000, size=(15_000_000, 2)) 
pd.DataFrame(frame_data*0.0001).to_csv('15mil.csv', header=False)
!wc 15mil*.csv ; du -h 15mil*.csv

    15000000   15000000  480696661 15mil.csv
    459M    15mil.csv

现在进入基准测试:

%%timeit -r 3 -n 1 -t
global df1
df1 = pd.read_csv('15mil.csv', header=None)
    9.7 s ± 95.1 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
%%timeit -r 3 -n 1 -t
global df2
df2 = mpd.read_csv('15mil.csv', header=None)
    3.07 s ± 685 ms per loop (mean ± std. dev. of 3 runs, 1 loop each)
(df2.values == df1.values).all()
    True

我们可以看到 modin 在我的设置中大约快 3 倍


现在回答您的具体问题

2- 清理一个包含非数字字符的csv文件,然后读取它

正如人们所指出的,您的瓶颈可能是转换器。您正在调用这些 lambda 3000 万次。在这种规模下,即使是函数调用开销也变得不小。

让我们来解决这个问题。

生成脏数据集:

!sed 's/.\4\/&)/g' 15mil.csv > 15mil_dirty.csv

方法

首先,我尝试使用带有转换器参数的 modin。然后,我尝试了一种不同的方法来减少调用正则表达式的次数:

首先,我将创建一个类似文件的对象,通过您的正则表达式过滤所有内容:

class FilterFile():
    def __init__(self, file):
        self.file = file
    def read(self, n):
        return re.sub(r"[^\d.,\n]", "", self.file.read(n))
    def write(self, *a): return self.file.write(*a) # needed to trick pandas
    def __iter__(self, *a): return self.file.__iter__(*a) # needed

然后我们将它作为 read_csv 中的第一个参数传递给 pandas:

with open('15mil_dirty.csv') as file:
    df2 = pd.read_csv(FilterFile(file))

基准测试:

%%timeit -r 1 -n 1 -t
global df1
df1 = pd.read_csv('15mil_dirty.csv', header=None,
        converters=0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))
           )
    2min 28s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df2
df2 = mpd.read_csv('15mil_dirty.csv', header=None,
        converters=0: lambda x: np.float32(re.sub(r"[^\d.]", "", x)),
                    1: lambda x: np.float32(re.sub(r"[^\d.]", "", x))
           )
    38.8 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%%timeit -r 1 -n 1 -t
global df3
df3 = pd.read_csv(FilterFile(open('15mil_dirty.csv')), header=None,)
    1min ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

看来 modin 又赢了! 不幸的是 modin 还没有实现从缓冲区读取,所以我设计了终极方法。

终极方法:

%%timeit -r 1 -n 1 -t
with open('15mil_dirty.csv') as f, open('/dev/shm/tmp_file', 'w') as tmp:
    tmp.write(f.read().translate(ord(i):None for i in '()'))
df4 = mpd.read_csv('/dev/shm/tmp_file', header=None)
    5.68 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

这使用translate,它比re.sub 快得多,还使用/dev/shm,这是ubuntu(和其他linuxes)通常提供的内存文件系统。在那里写入的任何文件都不会写入磁盘,因此速度很快。 最后,它使用 modin 来读取文件,绕过 modin 的缓冲区限制。 这种方法比您的方法快 30 倍,而且非常简单。

【讨论】:

以上是关于在python中读取15 M行csv文件的有效方法的主要内容,如果未能解决你的问题,请参考以下文章

使用python读取1TB HDFS csv文件的有效方法是啥

逐行读取 CSV 文件 python

使用 numpy / pandas 读取 Python 中 CSV 文件的最后 N 行

使用 numpy / pandas 读取 Python 中 CSV 文件的最后 N 行

用python读取一个文件夹下的所有CSV文件里某一列数据中最大值,将此最大值所在行截取到新CSV文件中?

有效地将最后 'n' 行 CSV 读入 DataFrame