Python中数千个大表的外连接

Posted

技术标签:

【中文标题】Python中数千个大表的外连接【英文标题】:Outer join in Python for thousands of large tables 【发布时间】:2013-10-27 11:22:18 【问题描述】:

所以,我有大约 4,000 个 CSV 文件,我需要将它们全部外部连接。每个文件有两列(一个字符串和一个浮点数)和 10,000-1,000,000 行之间,我想通过第一列(即字符串变量)加入。

我尝试了numpy.lib.recfunctions.join_by,但速度非常慢。我切换到pandas.merge,这要快得多,但考虑到我拥有的表的数量(和大小)仍然太慢。而且它似乎真的是内存密集型 - 当要合并的文件有数十万行时机器变得无法使用(我主要使用 MacBook Pro,2.4GHz,4GB)。

所以现在我正在寻找替代方案 - 我还缺少其他潜在的解决方案吗? Python 还存在哪些其他外连接实现?是否有论文/网站讨论和比较每个实现的时间复杂度?如果我只是简单地调用 Python 调用 sqlite3,然后让 sqlite3 进行连接会更有效吗?字符串键是问题吗?如果我可以使用数字键,它会更快吗?

如果它有助于让您更具体地了解我想要实现的目标,这是我使用 pandas.merge 的代码:

import os
import pandas as pd

def load_and_merge(file_names, path_to_files, columns):
    '''
    seq, str, dict -> pandas.DataFrame
    '''
    output = pd.DataFrame(columns = ['mykey']) # initialize output DataFrame
    for file in file_names:

        # load new data
        new_data = pd.read_csv(path + file,
                               usecols = [col for col in columns.keys()],
                               dtype = columns,
                               names = ['mykey', file.replace('.csv', '')],
                               header = None)

        # merge with previous data
        output = pd.merge(output, new_data, on = 'mykey', how = 'outer')
        output = output.fillna(0) # kill NaNs

    return output

path = '/Users/username/data/'
files_list = [file for file in os.listdir(path) if '.csv' in file]
merged_table = load_and_merge(files_list, path, 0: 'S30', 1: 'float')

(Mac OS X 10.6.8 和 Python 2.7.5;Ubuntu 12.04 和 Python 2.7.3)

【问题讨论】:

接下来我会尝试 SQL。我不知道有什么 Python 实现可以胜过 pandas。 如果您将数据帧转储到持久存储中(hdf5 表示坚持),然后将其加载回新的 python 实例,您是否有相同的内存占用?即比较加载 100 个文件时的内存占用与从新 python 实例中的存储加载的结果数据帧 好问题,我没试过。接下来我会这样做。 你最终选择了什么?你接受杰夫的建议了吗?使用 SQL 吗?似乎是 SQL 数据库要做的那种问题。 我同意 Jeff 的建议,但我还没有完成。 【参考方案1】:

这是我解决这个问题的方法。

不要迭代合并。您正在合并一个较小的框架(称为“合并”)和一个较大的框架(称为“合并”)。然后重复此操作,导致“合并”变得更大并拥有更多行。

相反,您可以进行重复的分层合并。假设您将合并编号为 1-4000。

将 1 和 2 合并为 1_2

然后重复,然后你合并 1_2 和 3_4 形成 1_2_3_4

这不会改变您正在做的工作量,但它使每次合并变得更加简单,从而降低了内存障碍并花费了时间(因为它必须通过键的笛卡尔积)。随机化合并顺序可能是有意义的。

此外,这是完全可并行化的,因为您可以让独立的进程处理这个问题,生成中间合并。这实质上变成了 map-reduce 类型的问题。

您还可以将中间合并存储在 HDF5 文件中(使用HDFStore),这将提高存储效率。请注意,这些文件应该是单独的文件,以避免使用多个进程写入同一个文件(HDF5 不支持)。

【讨论】:

太棒了!事实上,我没有充分的理由迭代合并。然后我将尝试实现分层合并,并将数据存储在 HDF5 文件中。如果这还不够快,我也会尝试 MapReduce 选项。【参考方案2】:

好的,如果我理解正确的话,这是 Jeff 方法的部分实现(请参阅上面的答案)。我发布这个以防其他人试图做类似的事情。并且如果有人可以帮助改进或“美化”这段代码(现在这是一个长而丑陋的代码流......我想我应该以某种方式模块化它。)

这是一个部分实现,因为我没有并行化合并。我尝试过,使用 Python 的 multiprocessing 模块,但显然我没有足够的计算机内存 - 两个同时处理的进程足以冻结一切(或者我只是做了一些完全愚蠢的事情 - 很有可能,因为我从未使用过 @ 987654322@之前)。但剩下的就在这里:分层合并和 HDF5(用于存储中间文件)。

#!/usr/bin/env python

import os
import sys
import pandas as pd

def merge_csv(folder, cols_info):
    '''
    str, dict -> pandas.DataFrame

    This function outer joins all CSV files in the specified folder. It 
    joins them hierarchically and stores the intermediate files in an HDF5 
    container. The first parameter is the path to the folder and the second 
    parameter is a dictionary mapping each column to the corresponding data 
    type. You can only specify two columns.

    Example: 

    merge_csv('/Users/username/data/', 0: 'S30', 2: 'float')

    Dependencies:

    - HDF5
    - PyTables
    - pandas
    '''

    # ensure that user is specifying only two columns
    if len(cols_info) != 2:
        sys.exit('Error: You can only specify two columns.')

    # ensure that path to input folder ends with '/'
    folder = folder + '/' if folder[-1] != '/' else folder

    # create HDF5 file to store intermediate files
    store = pd.HDFStore(folder + 'store.h5', mode = 'w')

    # load CSV files and write data to HDF5 file
    flist = [file for file in os.listdir(folder) if file[-4:] == '.csv']
    if len(flist) == 0:
        sys.exit('There are no CSV files in the specified folder.')
    for file in flist:
        case = file.replace('.csv', '')
        store[case] = pd.read_csv(folder + file, 
                                  usecols = [col for col in cols_info], 
                                  names = ['key', case], 
                                  dtype = cols_info)
        store.flush()

    # start merge routine
    flist = store.keys()
    counter = 0
    while len(flist) > 1:
        counter += 1

        # merge current set of files, two by two
        length = (len(flist) - 1) if (len(flist) % 2 == 1) else len(flist)
        for i in range(length)[0:length:2]:
            merged_data = pd.merge(store[flist[i]], store[flist[i + 1]], 
                                   on = 'key', 
                                   how = 'outer',
                                   sort = False)
            outputfile = 'file' + str(i) + str(i + 1)

            # if number of files is odd, make last pair a trio
            if (i == len(flist) - 3) and (len(flist) % 2 == 1):
                merged_data = pd.merge(merged_data, store[flist[i + 2]], 
                                       on = 'key',
                                       how = 'outer', 
                                       sort = False)
                outputfile += str(i + 2)

            # save merged pair (or trio) to HDF5 file
            merged_data = merged_data.fillna(0)
            store.put('/tmp' + str(counter) + '/' + outputfile, merged_data)
            store.flush()

        # clean up
        to_remove = [file for file in store.keys() 
                     if 'tmp' + str(counter) + '/' not in file]
        for file in to_remove:
            store.remove(file)

        # move on to next set of intermediate files
        flist = store.keys()

    # wrap up
    store.close()
    return merged_data

编辑

仍然没有好处:中间矩阵最终变得太大并超过计算机内存并且代码崩溃(tables.exceptions.HDF5ExtError: Problems creating the Array.)。我已经尝试过 sqlite3,但也没有用,所以我想我只需要继续寻找。

【讨论】:

嘿,你有没有找到解决这个问题的方法? 不。我最终求助于蛮力(我大学的超级计算机)。

以上是关于Python中数千个大表的外连接的主要内容,如果未能解决你的问题,请参考以下文章

连接操作中的外键

sqlalchemy中的外连接

深入理解关系中的外连接,左外连接,右外连接

从两个大表的连接中选择不同的值

Linq中不等谓词的外连接等价

sql条件下解码中的外连接