来自延迟 zip csv 的 Dask 数据帧

Posted

技术标签:

【中文标题】来自延迟 zip csv 的 Dask 数据帧【英文标题】:Dask dataframe from delayed zip csv 【发布时间】:2018-10-19 03:07:37 【问题描述】:

我正在尝试从一组压缩的 CSV 文件创建一个 dask 数据框。阅读问题,似乎 dask 需要使用 dask.distributed delayed()

import glob
import dask.dataframe as dd
import zipfile
import pandas as pd 
from dask.delayed import delayed

#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = 
for f in file_list:
    key = f.split('/')[5][:-4]
    zip_dict[key] = zipfile.ZipFile(f)

zip_dict = 'log20160201'的示例内容: zipfile.ZipFile 文件名='/my_directory/zip_files/log20160201.zip' 模式='r','log20160218':zipfile.ZipFile 文件名='/my_directory/zip_files/log20160218.zip' 模式='r'

# Create list of delayed pd.read_csv()    
d_rows = []
for k, v in zip_dict.items():

    row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
    d_rows.append(row)
    v.close()

d_rows 的样本内容 = [延迟('read_csv-c05dc861-79c3-4e22-8da6-927f5b7da123'), 延迟('read_csv-4fe1c901-44b4-478b-9c11-4a80f7a639e2')]

big_df = dd.from_delayed(d_rows)  

返回的错误是: ValueError:无效的文件路径或缓冲区对象类型:类“列表”

【问题讨论】:

您好。发布一些有关文件名的附加信息可能会有所帮助。我注意到您似乎将文件名存储在zip_dict 中。您能否将zip_dict 的内容与for k, v in zip_dict.items(): print(k) 一起发布以显示*.csv 文件名?此外,在我看来,您希望将所有文件垂直连接到单个 dask 数据帧中。这是正确的吗? 是的,这是正确的,我正在尝试创建一个单一的 dask 数据框。我已经用更多细节更新了原始帖子。 【参考方案1】:

高级方法

在这种情况下,我认为您实际上不需要字典 zip_dict 来使用 Pandas 懒惰地读取这些压缩文件。基于this very similar SO question to read in (.gz) compressed *.csv files using Dask(也显示为here),并且由于您要加载多个文件,因此至少有两种可能的方法可供您使用

    使用dask.delayedpandas.read_csv 在这里,您可以将每个文件读入pandas.DataFrame,但不是实际执行读入内存,而是延迟此操作,从而创建延迟对象列表(创建此列表的方法至少有两种如下所示) 使用for 循环创建列表,类似于[delayed(pd.read_csv)(f) for f in file_list] 如果您有 17 个 .csv.zip 文件,那么这将创建一个包含 17 个延迟对象的列表 用mapfunctools.partial 创建一个列表,这将创建一个单元素列表,看起来像list(map(functools.partial(delayed(pd.read_csv), file_list))) 如果您有 17 个 .csv.zip 文件,那么这会创建一个包含 1 个延迟对象的列表 然后您使用dd.from_delayed 将此延迟对象列表转换为pandas.DataFrame 使用循环方法,这类似于dd.from_delayed(dfs) 使用map()functools.partial 方法,您将使用dd.from_delayed(dfs).repartition(file_list) 由于这种方法只给出一个单(延迟)元素列表,因此生成的 dask.dataframe 将具有将所有文件垂直连接到单个 dask.dataframe 分区中的效果 为了将 17 个文件中的每一个分隔到 dask.dataframe 的专用分区中,您需要使用 .repartition() 使用dask.dataframe.read_csv(file_list)directly,它实际上使用pandas.read_csv,因此它接受来自pandas.read_csv的许多关键字参数

在这两种方法中

Dask 的最佳实践是指定将读入的列 (as recommended) 的 dtypes 您可以使用字典来执行此操作,它看起来像"time": int, "cik": int,因为您只需要列timecik,并且您知道它们中的每一个都应该是int(整数) dtype 使用.read_csv()关键字 usecols 指定需要的列名列表 compression 表示正在读入一个.zip 文件

Python 代码

以下是根据需要使用简短的 cmets 实现这些方法的代码

进口

from functools import partial
from itertools import repeat
from glob import glob
from collections import OrderedDict

import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.delayed import delayed

生成虚拟数据文件

使用this SO answer,生成多个.csv文件

def generate_multiple_csvs(data_dir, file_num=1):
    col_names = list("ABCDEFG")+["time", "cik"]
    df = pd.DataFrame(np.random.randint(10, size=(10,9)), columns=col_names)
    filename = f"data_file_file_num.csv.zip"
    filepath = data_dir + "/" + filename
    df["filepath"] = filename
    df.to_csv(filepath, index=False, compression="zip")
    return df

# Specify path the directory where `.csv.zip` files should be created
data_dir = "data/processed"

# Specify number of files to create
num_files_wanted = 8
使用itertools.repeat 创建虚拟文件
_ = list(
    map(
        generate_multiple_csvs,
        repeat(data_dir, num_files_wanted),
        list(range(1, num_files_wanted+1)),
    )
)
使用functools.partial 创建虚拟文件
_ = list(
    map(
        partial(generate_multiple_csvs, data_dir),
        list(range(9, 9+num_files_wanted+1)),
    )
)

按文件类型获取文件列表

file_list = glob(data_dir + "/" + "*.zip")

为 Dask DataFrame 中的列指定列 dtypes(推荐)

my_dtypes = OrderedDict([("time",int), ("cik",int)])

方法 1 - 使用 dask.delayedfor 循环

# Lazily reading files into Pandas DataFrames by looping
dfs = [
    delayed(pd.read_csv)(f, compression='zip', usecols=['time','cik'])
    for f in file_list
]

# Combine into a single Dask DataFrame
ddf_from_delayed_loop = dd.from_delayed(dfs, meta=my_dtypes)

print(type(ddf_from_delayed_loop))
print(ddf_from_delayed_loop)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: from-delayed, 34 tasks

方法 1 - 使用 dask.delayedmap

# Lazily reading files into Pandas DataFrames with Python's built-in map()
dfs = list(
    map(
        partial(
            delayed(pd.read_csv),
            compression="zip",
            usecols=['time', 'cik'],
        ),
        file_list,
    )
)

# Combine into a single Dask DataFrame and repartition
ddf_from_delayed_map = dd.from_delayed(dfs, meta=my_dtypes).repartition(
    npartitions=len(file_list)
)

print(type(ddf_from_delayed_map))
print(ddf_from_delayed_map)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: from-delayed, 34 tasks

方法 2 - 直接使用dask.dataframe

# Lazily reading files into single Dask DataFrame
ddf_direct = dd.read_csv(
    data_dir+"/*.csv.zip",
    compression='zip',
    dtype=my_dtypes,
    blocksize=None,
    usecols=['time','cik'],
)

print(type(ddf_direct))
print(ddf_direct)
输出
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
                 time    cik
npartitions=17              
                int64  int64
                  ...    ...
...               ...    ...
                  ...    ...
                  ...    ...
Dask Name: read-csv, 17 tasks

注意事项

    对于上述所有方法,指定分区数时应牢记以下几点 dask 最佳实践 choosing the optimal partition size (basics of this approach) avoid large computation graphs avoid large partitions factors to keep in mind for large datasets 使用batching for the dask.delayed approach with a for loop 来减少大量调用dask.delayed 的开销(请参阅this SO question 了解批处理实现)。

【讨论】:

以上是关于来自延迟 zip csv 的 Dask 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

Dask:我如何将我的代码与 dask 延迟并行化?

使用 Python 将 Dask 数据帧转换为 Spark 数据帧

延迟渲染:在默认帧缓冲区中使用来自 Gbuffer 的深度缓冲区

如何从标准输入读取 dask 数据帧?

使用 Dask 数据帧的 Autosklearn 预测/ Autosklearn 对 dask 数据帧的支持

为啥来自 s3 的 dask read_csv 保留了这么多内存?