pandas.io.common.CParserError:标记数据时出错。 C 错误:捕获缓冲区溢出 - 可能的输入文件格式错误

Posted

技术标签:

【中文标题】pandas.io.common.CParserError:标记数据时出错。 C 错误:捕获缓冲区溢出 - 可能的输入文件格式错误【英文标题】:pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file 【发布时间】:2016-12-10 00:12:24 【问题描述】:

我有大小超过 10 mb 的大型 csv 文件和大约 50 多个这样的文件。这些输入有超过 25 列和超过 50K 行。

所有这些都有相同的标题,我正在尝试将它们合并到一个 csv 中,并且标题只被提及一次。

选项:一个 代码:适用于小型 csv -- 超过 25 列,但文件大小以 kbs 为单位。

import pandas as pd
import glob

interesting_files = glob.glob("*.csv")
df_list = []
for filename in sorted(interesting_files):
    df_list.append(pd.read_csv(filename))

full_df = pd.concat(df_list)

full_df.to_csv('output.csv')

但上面的代码不适用于较大的文件并给出错误。

错误:

Traceback (most recent call last):
  File "merge_large.py", line 6, in <module>
    all_files = glob.glob("*.csv", encoding='utf8', engine='python')     
TypeError: glob() got an unexpected keyword argument 'encoding'
lakshmi@lakshmi-HP-15-Notebook-PC:~/Desktop/Twitter_Lat_lon/nasik_rain/rain_2$ python merge_large.py 
Traceback (most recent call last):
  File "merge_large.py", line 10, in <module>
    df = pd.read_csv(file_,index_col=None, header=0)
  File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 562, in parser_f
    return _read(filepath_or_buffer, kwds)
  File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 325, in _read
    return parser.read()
  File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 815, in read
    ret = self._engine.read(nrows)
  File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1314, in read
    data = self._reader.read(nrows)
  File "pandas/parser.pyx", line 805, in pandas.parser.TextReader.read (pandas/parser.c:8748)
  File "pandas/parser.pyx", line 827, in pandas.parser.TextReader._read_low_memory (pandas/parser.c:9003)
  File "pandas/parser.pyx", line 881, in pandas.parser.TextReader._read_rows (pandas/parser.c:9731)
  File "pandas/parser.pyx", line 868, in pandas.parser.TextReader._tokenize_rows (pandas/parser.c:9602)
  File "pandas/parser.pyx", line 1865, in pandas.parser.raise_parser_error (pandas/parser.c:23325)
pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file.

代码:列 25+,但文件大小超过 10mb

选项:Two 选项:Three

选项:四个

import pandas as pd
import glob

    interesting_files = glob.glob("*.csv")
    df_list = []
    for filename in sorted(interesting_files):
        df_list.append(pd.read_csv(filename))

    full_df = pd.concat(df_list)

    full_df.to_csv('output.csv')

错误:

Traceback (most recent call last):
  File "merge_large.py", line 6, in <module>
    allFiles = glob.glob("*.csv", sep=None)
TypeError: glob() got an unexpected keyword argument 'sep'

我进行了广泛的搜索,但找不到将具有相同标题的大型 csv 文件连接到一个文件中的解决方案。

编辑:

代码:

import dask.dataframe as dd  

ddf = dd.read_csv('*.csv')

ddf.to_csv('master.csv',index=False)

错误:

Traceback (most recent call last):
  File "merge_csv_dask.py", line 5, in <module>
    ddf.to_csv('master.csv',index=False)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/core.py", line 792, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io.py", line 762, in to_csv
    compute(*values)
  File "/usr/local/lib/python2.7/dist-packages/dask/base.py", line 179, in compute
    results = get(dsk, keys, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/threaded.py", line 58, in get
    **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 481, in get_async
    raise(remote_exception(res, tb))
dask.async.ValueError: could not convert string to float: u'type': u'Point', u'coordinates': [4.34279, 50.8443]

Traceback
---------
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 263, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 245, in _execute_task
    return func(*args2)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 49, in bytes_read_csv
    coerce_dtypes(df, dtypes)
  File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 73, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2950, in astype
    raise_on_error=raise_on_error, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2938, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2890, in apply
    applied = getattr(b, f)(**kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 434, in astype
    values=values, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 477, in _astype
    values = com._astype_nansafe(values.ravel(), dtype, copy=True)
  File "/usr/local/lib/python2.7/dist-packages/pandas/core/common.py", line 1920, in _astype_nansafe
    return arr.astype(dtype

)

【问题讨论】:

【参考方案1】:

如果我了解您的问题,您的大型 csv 文件具有相同的结构,并且您希望合并到一个大型 CSV 文件中。

我的建议是使用 Continuum Analytics 的 dask 来处理这项工作。您可以合并文件,也可以像 pandas 一样执行核外计算和数据分析。

### make sure you include the [complete] tag
pip install dask[complete]

使用 DropBox 中的示例数据的解决方案

首先,检查 dask 的版本。对我来说,dask = 0.11.0 和 pandas = 0.18.1

import dask
import pandas as pd
print (dask.__version__)
print (pd.__version__)

这是在所有 csv 中读取的代码。使用您的 DropBox 示例数据时我没有出错。

import dask.dataframe as dd
from dask.delayed import delayed
import dask.bag as db
import glob

filenames = glob.glob('/Users/linwood/Downloads/stack_bundle/rio*.csv')

'''
The key to getting around the CParse error was using sep=None
Came from this post
http://***.com/questions/37505577/cparsererror-error-tokenizing-data
'''

# custom saver function for dataframes using newfilenames
def reader(filename):
    return pd.read_csv(filename,sep=None)

# build list of delayed pandas csv reads; then read in as dask dataframe

dfs = [delayed(reader)(fn) for fn in filenames]
df = dd.from_delayed(dfs)


'''
This is the final step.  The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your
files merged. If you don't need to write the merged file to
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that.  
'''
df = df.reset_index().compute()
df.to_csv('./test.csv')

剩下的就是额外的东西

# print the count of values in each column; perfect data would have the same count
# you have dirty data as the counts will show

print (df.count().compute())

下一步是做一些类似熊猫的分析。这是我首先“清理”“tweetFavoriteCt”列的数据的一些代码。所有数据都不是整数,因此我将字符串替换为“0”并将其他所有数据转换为整数。获得整数转换后,我将展示一个简单的分析,在该分析中我过滤整个数据帧以仅包含 favoriteCt 大于 3 的行

# function to convert numbers to integer and replace string with 0; sample analytics in dask dataframe
# you can come up with your own..this is just for an example
def conversion(value):
    try:
        return int(value)
    except:
        return int(0)

# apply the function to the column, create a new column of cleaned data
clean = df['tweetFavoriteCt'].apply(lambda x: (conversion(x)),meta=('stuff',str))

# set new column equal to our cleaning code above; your data is dirty :-(
df['cleanedFavoriteCt'] = clean

最后一段代码显示了 dask 分析以及如何将这个合并的文件加载到 pandas 中,以及如何将合并的文件写入磁盘。请注意,如果您有大量 CSV,当您使用下面的 .compute() 代码时,它会将合并后的 csv 加载到内存中。

# retreive the 50 tweets with the highest favorite count 
print(df.nlargest(50,['cleanedFavoriteCt']).compute())

# only show me the tweets that have been favorited at least 3 times
# TweetID 763525237166268416, is VERRRRY popular....7000+ favorites
print((df[df.cleanedFavoriteCt.apply(lambda x: x>3,meta=('stuff',str))]).compute())

'''
This is the final step.  The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your
files merged. If you don't need to write the merged file to
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that.  
'''
df = df.reset_index().compute()
df.to_csv('./test.csv')

现在,如果您想为合并的 csv 文件切换到 pandas:

import pandas as pd
dff = pd.read_csv('./test.csv')

让我知道这是否有效。

到此为止

存档:以前的解决方案;使用 dask 合并 CSV 的好例子

第一步是确保您已安装dask。有install instructions for dask in the documentation page,但这应该可以:

安装 dask 后,可以轻松读取文件。

先做一些家务。假设我们有一个包含 csvs 的目录,其中文件名是 my18.csvmy19.csvmy20.csv 等。名称标准化和单个目录位置是关键。如果您将 csv 文件放在一个目录中并以某种方式序列化名称,则此方法有效。

分步:

    导入dask,使用通配符读取所有csv文件。这会将所有 csv 合并到一个 dask.dataframe 对象中。如果需要,您可以在此步骤之后立即进行类似 pandas 的操作。
import dask.dataframe as dd  
ddf = dd.read_csv('./daskTest/my*.csv')
ddf.describe().compute()
    将合并的数据帧文件写入与原始文件相同目录的磁盘并命名为master.csv
ddf.to_csv('./daskTest/master.csv',index=False)
    可选,将尺寸更大的master.csv 读入dask.dataframe 对象进行计算。这也可以在上述第一步之后完成; dask 可以对暂存文件执行类似 pandas 的操作...这是在 Python 中执行“大数据”的一种方式
# reads in the merged file as one BIG out-of-core dataframe; can perform functions like pangas    
newddf = dd.read_csv('./daskTest/master.csv')

#check the length; this is now length of all merged files. in this example, 50,000 rows times 11 = 550000 rows.
len(newddf)

# perform pandas-like summary stats on entire dataframe
newddf.describe().compute()

希望这有助于回答您的问题。在三个步骤中,您读取所有文件,合并到单个数据帧,然后将大量数据帧写入磁盘,只有一个标题和所有行。

【讨论】:

非常感谢您的详细解释。这是很多信息。让我试试代码,如果我有任何疑问,会告诉你。再次感谢:) 抱歉回复晚了。我收到错误,我在编辑部分包含了代码和错误。请你检查一下。 P.S.数据框具有相同的标头。 会看;几分钟前,我想出了如何使用 dask 从 Twitter API 中清理合并和清理多个 JSON 文件。所以这对于找出解决方案是个好消息。将查看您的数据,因为您使用的是 csv 而不是 json。同时,这是我的 json 和合并管道:***.com/questions/38760864/… @SitzBlogz,看看上面新编辑的解决方案。看看你是否至少可以执行类似 pandas 的操作。

以上是关于pandas.io.common.CParserError:标记数据时出错。 C 错误:捕获缓冲区溢出 - 可能的输入文件格式错误的主要内容,如果未能解决你的问题,请参考以下文章