Pandas + HDF5 Panel 大数据存储

Posted

技术标签:

【中文标题】Pandas + HDF5 Panel 大数据存储【英文标题】:Pandas + HDF5 Panel data storage for large data 【发布时间】:2017-02-23 16:27:45 【问题描述】:

作为我研究的一部分,我正在为我的面板数据寻找一个好的存储设计。我将 pandas 用于所有内存操作。我查看了以下两个问题/贡献,Large Data Work flows using Pandas 和 Query HDF5 Pandas 因为它们最接近我的设置。但是,我还有几个问题。首先,让我定义我的数据和一些要求:

    大小:我有大约 800 个日期、9000 个 ID 和多达 200 个变量。因此,展平面板(沿日期和 ID)对应于 7.2mio 行和 200 列。这可能都适合内存或不适合,让我们假设它不适合。磁盘空间不是问题。

    变量通常只计算一次,但更新/更改可能会不时发生。一旦发生更新,旧版本就不再重要了。

    不时添加新变量,主要是一次一个。

    不添加新行。

    进行查询。例如,通常我只需要选择某个日期范围,例如date>start_date & date<end_date。但有些查询需要考虑日期的排名条件。例如,获取rank(var1)>500 & rank(var1)<1000 所在的所有数据(即列),其中排名为截至日期。

目标是实现数据的快速读取/查询。数据写入不是那么重要。

我想到了下面的 HDF5 设计:

    按照 groups_map 方法(1)将变量存储在不同的表中。将每组的列数限制为 10(以避免在更新单个变量时产生大量内存负载,请参见第 3 点)。

    每个组代表一个表,其中我使用基于日期和 id 的多索引来存储每个表。

    创建更新函数,以更新变量。这些函数将包含所有 (10) 列的表作为 df 加载到内存中,删除磁盘上的表,替换 df 中更新的变量并将表从内存中保存回磁盘。

    创建一个添加函数,将 var1 添加到少于 10 列的组中,或者根据需要创建新组。保存类似于 3. 将当前组加载到内存,删除磁盘上的表,添加新列并将其保存回磁盘。

    计算相关变量截至日期的排名,并将它们作为 rank_var1 添加到磁盘存储中,这应该会将查询时间减少到简单的rank_var1 > 500 & rank_var1<1000

我有以下问题:

    更新 HDFTable,我想我必须删除整个表才能更新单个列?

    何时使用 'data_columns',还是应该在 HDFStore.append() 中简单地分配 True?

    如果我想根据rank_var1 > 500 & rank_var1<1000的条件进行查询,但我需要其他组的列。我可以将从 rank_var1 条件接收到的索引输入到查询中以获取基于该索引的其他列(索引是具有日期和 ID 的多索引)吗?或者我是否需要按日期循环该索引,然后将类似于2 中建议的 ID 分块,并为我需要的每个组重复该过程。或者,(a)我可以向每个组添加表等级列,但在磁盘存储方面似乎效率极低。请注意,与等级过滤相关的变量数量是有限的(比如 5 个)。或者 (b) 我可以简单地使用从 rank_var1 查询接收到的 df_rank 并通过 df_rank.merge(df_tmp, left_index=True, right_index=True, how='left') 使用内存中的操作,并在我选择所需列的组 (df_tmp) 中循环。

    假设我有一些不同频率的数据。我想为不同的频率使用不同的 group_maps(或不同的存储)是要走的路吗?

    存储的副本可能用于 win/ux 系统。我认为它完全兼容,这里有什么要考虑的吗?

    我打算使用pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc')。对 complevel 或 complib 有任何顾虑吗?

我已经开始编写一些代码,一旦我有东西要展示,我会根据需要进行编辑和添加。如果您需要更多信息,请告诉我。

编辑我这里是我的存储类的第一个版本,请相应地调整底部的路径。代码太长见谅,欢迎cmets

import pandas as pd
import numpy as np
import string

class LargeDFStorage():

    # TODO add index features to ensure correct indexes
    # index_names = ('date', 'id')

    def __init__(self, h5_path, groups_map):
        """

        Parameters
        ----------
        h5_path: str
            hdf5 storage path
        groups_map: dict
            where keys are group_names and values are dict, with at least key
            'columns' where the value is list of column names.
            A special group_name is reserved for group_name/key "query", which
            can be used as queering and conditioning table when getting data,
            see :meth:`.get`.
        """

        self.path = str(h5_path)
        self.groups_map = groups_map
        self.column_map = self._get_column_map()
        # if desired make part of arguments
        self.complib = 'blosc'
        self.complevel = 9

    def _get_column_map(self):
        """ Calc the inverse of the groups_map/ensures uniqueness of cols

        Returns
        -------
        dict: with cols as keys and group_names as values
        """
        column_map = dict()
        for g, value in self.groups_map.items():
            if len(set(column_map.keys()) & set(value['columns'])) > 0:
                raise ValueError('Columns have to be unique')
            for col in value['columns']:
                column_map[col] = g

        return column_map

    @staticmethod
    def group_col_names(store, group_name):
        """ Returns all column names of specific group

        Parameters
        ----------
        store: pd.HDFStore
        group_name: str

        Returns
        -------
        list:
            of all column names in the group
        """
        if group_name not in store:
            return []

        # hack to get column names, straightforward way!?
        return store.select(group_name, start=0, stop=0).columns.tolist()

    @staticmethod
    def stored_cols(store):
        """ Collects all columns stored in HDF5 store

        Parameters
        ----------
        store: pd.HDFStore

        Returns
        -------
        list:
            a list of all columns currently in the store
        """
        stored_cols = list()
        for x in store.items():
            group_name = x[0][1:]
            stored_cols += LargeDFStorage.group_col_names(store, group_name)

        return stored_cols

    def _find_groups(self, columns):
        """ Searches all groups required for covering columns

        Parameters
        ----------
        columns: list
            list of valid columns

        Returns
        -------
        list:
            of unique groups
        """
        groups = list()
        for column in columns:
            groups.append(self.column_map[column])

        return list(set(groups))

    def add_columns(self, df):
        """ Adds columns to storage for the first time. If columns should
        be updated use(use :meth:`.update` instead)

        Parameters
        ----------
        df: pandas.DataFrame
            with new columns (not yet stored in any of the tables)

        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if any column has been stored already
        if df.columns.isin(self.stored_cols(store)).any():
            store.close()
            raise ValueError('Some cols are already in the store')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)

        for group in groups:
            v = self.groups_map[group]

            # select columns of current group in df
            select_cols = df.columns[df.columns.isin(v['columns'])].tolist()
            tmp = df.reindex(columns=select_cols, copy=False)

            # set data column to False only in case of query data
            dc = None
            if group=='query':
                dc = True

            stored_cols = self.group_col_names(store,group)
            # no columns in group (group does not exists yet)
            if len(stored_cols)==0:
                store.append(group, tmp, data_columns=dc)
            else:
                # load current disk data to memory
                df_grp = store.get(group)
                # remove data from disk
                store.remove(group)
                # add new column(s) to df_disk
                df_grp = df_grp.merge(tmp, left_index=True, right_index=True,
                                      how='left')
                # save old data with new, additional columns
                store.append(group, df_grp, data_columns=dc)

        store.close()

    def _query_table(self, store, columns, where):
        """ Selects data from table 'query' and uses where expression

        Parameters
        ----------
        store: pd.HDFStore
        columns: list
            desired data columns
        where: str
            a valid select expression

        Returns
        -------

        """

        query_cols = self.group_col_names(store, 'query')
        if len(query_cols) == 0:
            store.close()
            raise ValueError('No data to query table')
        get_cols = list(set(query_cols) & set(columns))
        if len(get_cols) == 0:
            # load only one column to minimize memory usage
            df_query = store.select('query', columns=query_cols[0],
                                    where=where)
            add_query = False
        else:
            # load columns which are anyways needed already
            df_query = store.select('query', columns=get_cols, where=where)
            add_query = True

        return df_query, add_query

    def get(self, columns, where=None):
        """ Retrieve data from storage

        Parameters
        ----------
        columns: list/str
            list of columns to use, or use 'all' if all columns should be
            retrieved
        where: str
            a valid select statement

        Returns
        -------
        pandas.DataFrame
            with all requested columns and considering where
        """
        store = pd.HDFStore(str(self.path), mode='r')

        # get all columns in stored in HDFStorage
        stored_cols = self.stored_cols(store)

        if columns == 'all':
            columns = stored_cols

        # check if all desired columns can be found in storage
        if len(set(columns) - set(stored_cols)) > 0:
            store.close()
            raise ValueError('Column(s): . not in storage'.format(
                set(columns)- set(stored_cols)))

        # get all relevant groups (where columns are taken from)
        groups = self._find_groups(columns)

        # if where query is defined retrieve data from storage, eventually
        # only index of df_query might be used
        if where is not None:
            df_query, add_df_query = self._query_table(store, columns, where)
        else:
            df_query, add_df_query = None, False

        # dd collector
        df = list()
        for group in groups:
            # skip in case where was used and columns used from
            if where is not None and group=='query':
                continue
            # all columns which are in group but also requested
            get_cols = list(
                set(self.group_col_names(store, group)) & set(columns))

            tmp_df = store.select(group, columns=get_cols)
            if df_query is None:
                df.append(tmp_df)
            else:
                # align query index with df index from storage
                df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0)
                df.append(tmp_df)

        store.close()

        # if any data of query should be added
        if add_df_query:
            df.append(df_query)

        # combine all columns
        df = pd.concat(df, axis=1)

        return df

    def update(self, df):
        """ Updates data in storage, all columns have to be stored already in
        order to be accepted for updating (use :meth:`.add_columns` instead)

        Parameters
        ----------
        df: pd.DataFrame
            with index as in storage, and column as desired


        Returns
        -------

        """
        store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
                            complib=self.complib)

        # check if all column have been stored already
        if df.columns.isin(self.stored_cols(store)).all() is False:
            store.close()
            raise ValueError('Some cols have not been stored yet')

        # find all groups needed to store the data
        groups = self._find_groups(df.columns)
        for group in groups:
            dc = None
            if group=='query':
                dc = True
            # load current disk data to memory
            group_df = store.get(group)
            # remove data from disk
            store.remove(group)
            # update with new data
            group_df.update(df)
            # save updated df back to disk
            store.append(group, group_df, data_columns=dc)

        store.close()


class DataGenerator():
    np.random.seed(1282)

    @staticmethod
    def get_df(rows=100, cols=10, freq='M'):
        """ Simulate data frame
        """
        if cols < 26:
            col_name = list(string.ascii_lowercase[:cols])
        else:
            col_name = range(cols)
        if rows > 2000:
            freq = 'Min'
        index = pd.date_range('19870825', periods=rows, freq=freq)
        df = pd.DataFrame(np.random.standard_normal((rows, cols)),
                          columns=col_name, index=index)
        df.index.name = 'date'
        df.columns.name = 'ID'
        return df

    @staticmethod
    def get_panel(rows=1000, cols=500, items=10):
        """ simulate panel data
        """

        if items < 26:
            item_names = list(string.ascii_lowercase[:cols])
        else:
            item_names = range(cols)
        panel_ = dict()

        for item in item_names:
            panel_[item] = DataGenerator.get_df(rows=rows, cols=cols)

        return pd.Panel(panel_)


def main():
    # Example of with DataFrame
    path = 'D:\\fc_storage.h5'
    groups_map = dict(
        a=dict(columns=['a', 'b', 'c', 'd', 'k']),
        query=dict(columns=['e', 'f', 'g', 'rank_a']),
    )
    storage = LargeDFStorage(path, groups_map=groups_map)
    df = DataGenerator.get_df(rows=200000, cols=15)
    storage.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage.update(df[['a']]*3)
    storage.add_columns(df[['d', 'g']])

    print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0'))

    # Example with panel and rank condition
    path2 = 'D:\\panel_storage.h5'
    storage_pnl = LargeDFStorage(path2, groups_map=groups_map)
    panel = DataGenerator.get_panel(rows=800, cols=2000, items=24)
    df = panel.to_frame()
    df['rank_a'] = df[['a']].groupby(level='date').rank()
    storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']])
    storage_pnl.update(df[['a']]*3)
    storage_pnl.add_columns(df[['d', 'g', 'rank_a']])
    print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'],
                          where='f>0 & e>0 & rank_a <100'))


if __name__ == '__main__':
    main()

【问题讨论】:

【参考方案1】:

如果没有具体的例子,回答这些问题有点困难......

更新 HDFTable,我想我必须删除整个表 为了更新单个列?

AFAIK 是的,除非您单独存储单个列,但它会自动完成,您只需将 DF/Panel 写回 HDF 存储。

何时使用“data_columns”,或者我应该简单地在 HDFStore.append()?

data_columns=True - 将索引 all 您的列 - IMO 这是浪费资源,除非您要在 where 参数中使用 all 列(即,如果所有列都应该被索引)。 我只会在where= 子句中指定那些经常用于搜索的列。将这些列视为数据库表中的索引列。

如果我想根据 rank_var1 > 500 & 的条件进行查询 rank_var1

我认为我们需要一些可重现的样本数据和您的查询示例才能给出合理的答案...

存储的副本可能用于 win/ux 系统。我认为是 完美兼容,这里有什么要考虑的吗?

是的,应该是完全兼容的

我打算使用 pd.HDFStore(str(self.path), mode='a', complevel=9, complib ='blosc')。对 complevel 或 complib 有任何顾虑吗?

使用您的数据对其进行测试 - 结果可能取决于数据类型、唯一值的数量等。您可能还需要考虑lzo complib - 在某些用例中它可能会更快。检查this。有时较高的complevel 不会给你更好的压缩比,但会更慢(见my old comparison 的结果)

【讨论】:

所以我添加了一个冗长的代码示例(对此感到抱歉)。也许 storage_pnl 示例显示了我在查询排名数据条件时的意思。如果您有兴趣查看.get() 方法的详细信息。与where 一起使用的get() 在内存管理方面可能不是最有效的实现,但它在我的硬件上执行得相当快。

以上是关于Pandas + HDF5 Panel 大数据存储的主要内容,如果未能解决你的问题,请参考以下文章

Pandas系列之入门篇——HDF5

在 Pandas 中迭代写入 HDF5 存储

在 HDF5 中存储 Pandas 对象和常规 Python 对象

将 Pandas DataFrames 保存为 HDF5 存储,各种错误

使用 pandas 读取 hdf5 数据集

如何将 Pandas DataFrame 存储为 HDF5 PyTables 表(或 CArray、EArray 等)?