Pandas + HDF5 Panel 大数据存储
Posted
技术标签:
【中文标题】Pandas + HDF5 Panel 大数据存储【英文标题】:Pandas + HDF5 Panel data storage for large data 【发布时间】:2017-02-23 16:27:45 【问题描述】:作为我研究的一部分,我正在为我的面板数据寻找一个好的存储设计。我将 pandas 用于所有内存操作。我查看了以下两个问题/贡献,Large Data Work flows using Pandas 和 Query HDF5 Pandas 因为它们最接近我的设置。但是,我还有几个问题。首先,让我定义我的数据和一些要求:
大小:我有大约 800 个日期、9000 个 ID 和多达 200 个变量。因此,展平面板(沿日期和 ID)对应于 7.2mio 行和 200 列。这可能都适合内存或不适合,让我们假设它不适合。磁盘空间不是问题。
变量通常只计算一次,但更新/更改可能会不时发生。一旦发生更新,旧版本就不再重要了。
不时添加新变量,主要是一次一个。
不添加新行。
进行查询。例如,通常我只需要选择某个日期范围,例如date>start_date & date<end_date
。但有些查询需要考虑日期的排名条件。例如,获取rank(var1)>500 & rank(var1)<1000
所在的所有数据(即列),其中排名为截至日期。
目标是实现数据的快速读取/查询。数据写入不是那么重要。
我想到了下面的 HDF5 设计:
按照 groups_map 方法(1)将变量存储在不同的表中。将每组的列数限制为 10(以避免在更新单个变量时产生大量内存负载,请参见第 3 点)。
每个组代表一个表,其中我使用基于日期和 id 的多索引来存储每个表。
创建更新函数,以更新变量。这些函数将包含所有 (10) 列的表作为 df 加载到内存中,删除磁盘上的表,替换 df 中更新的变量并将表从内存中保存回磁盘。
创建一个添加函数,将 var1 添加到少于 10 列的组中,或者根据需要创建新组。保存类似于 3. 将当前组加载到内存,删除磁盘上的表,添加新列并将其保存回磁盘。
计算相关变量截至日期的排名,并将它们作为 rank_var1 添加到磁盘存储中,这应该会将查询时间减少到简单的rank_var1 > 500 & rank_var1<1000
。
我有以下问题:
更新 HDFTable,我想我必须删除整个表才能更新单个列?
何时使用 'data_columns',还是应该在 HDFStore.append() 中简单地分配 True?
如果我想根据rank_var1 > 500 & rank_var1<1000
的条件进行查询,但我需要其他组的列。我可以将从 rank_var1 条件接收到的索引输入到查询中以获取基于该索引的其他列(索引是具有日期和 ID 的多索引)吗?或者我是否需要按日期循环该索引,然后将类似于2 中建议的 ID 分块,并为我需要的每个组重复该过程。或者,(a)我可以向每个组添加表等级列,但在磁盘存储方面似乎效率极低。请注意,与等级过滤相关的变量数量是有限的(比如 5 个)。或者 (b) 我可以简单地使用从 rank_var1 查询接收到的 df_rank 并通过 df_rank.merge(df_tmp, left_index=True, right_index=True, how='left')
使用内存中的操作,并在我选择所需列的组 (df_tmp) 中循环。
假设我有一些不同频率的数据。我想为不同的频率使用不同的 group_maps(或不同的存储)是要走的路吗?
存储的副本可能用于 win/ux 系统。我认为它完全兼容,这里有什么要考虑的吗?
我打算使用pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc')
。对 complevel 或 complib 有任何顾虑吗?
我已经开始编写一些代码,一旦我有东西要展示,我会根据需要进行编辑和添加。如果您需要更多信息,请告诉我。
编辑我这里是我的存储类的第一个版本,请相应地调整底部的路径。代码太长见谅,欢迎cmets
import pandas as pd
import numpy as np
import string
class LargeDFStorage():
# TODO add index features to ensure correct indexes
# index_names = ('date', 'id')
def __init__(self, h5_path, groups_map):
"""
Parameters
----------
h5_path: str
hdf5 storage path
groups_map: dict
where keys are group_names and values are dict, with at least key
'columns' where the value is list of column names.
A special group_name is reserved for group_name/key "query", which
can be used as queering and conditioning table when getting data,
see :meth:`.get`.
"""
self.path = str(h5_path)
self.groups_map = groups_map
self.column_map = self._get_column_map()
# if desired make part of arguments
self.complib = 'blosc'
self.complevel = 9
def _get_column_map(self):
""" Calc the inverse of the groups_map/ensures uniqueness of cols
Returns
-------
dict: with cols as keys and group_names as values
"""
column_map = dict()
for g, value in self.groups_map.items():
if len(set(column_map.keys()) & set(value['columns'])) > 0:
raise ValueError('Columns have to be unique')
for col in value['columns']:
column_map[col] = g
return column_map
@staticmethod
def group_col_names(store, group_name):
""" Returns all column names of specific group
Parameters
----------
store: pd.HDFStore
group_name: str
Returns
-------
list:
of all column names in the group
"""
if group_name not in store:
return []
# hack to get column names, straightforward way!?
return store.select(group_name, start=0, stop=0).columns.tolist()
@staticmethod
def stored_cols(store):
""" Collects all columns stored in HDF5 store
Parameters
----------
store: pd.HDFStore
Returns
-------
list:
a list of all columns currently in the store
"""
stored_cols = list()
for x in store.items():
group_name = x[0][1:]
stored_cols += LargeDFStorage.group_col_names(store, group_name)
return stored_cols
def _find_groups(self, columns):
""" Searches all groups required for covering columns
Parameters
----------
columns: list
list of valid columns
Returns
-------
list:
of unique groups
"""
groups = list()
for column in columns:
groups.append(self.column_map[column])
return list(set(groups))
def add_columns(self, df):
""" Adds columns to storage for the first time. If columns should
be updated use(use :meth:`.update` instead)
Parameters
----------
df: pandas.DataFrame
with new columns (not yet stored in any of the tables)
Returns
-------
"""
store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
complib=self.complib)
# check if any column has been stored already
if df.columns.isin(self.stored_cols(store)).any():
store.close()
raise ValueError('Some cols are already in the store')
# find all groups needed to store the data
groups = self._find_groups(df.columns)
for group in groups:
v = self.groups_map[group]
# select columns of current group in df
select_cols = df.columns[df.columns.isin(v['columns'])].tolist()
tmp = df.reindex(columns=select_cols, copy=False)
# set data column to False only in case of query data
dc = None
if group=='query':
dc = True
stored_cols = self.group_col_names(store,group)
# no columns in group (group does not exists yet)
if len(stored_cols)==0:
store.append(group, tmp, data_columns=dc)
else:
# load current disk data to memory
df_grp = store.get(group)
# remove data from disk
store.remove(group)
# add new column(s) to df_disk
df_grp = df_grp.merge(tmp, left_index=True, right_index=True,
how='left')
# save old data with new, additional columns
store.append(group, df_grp, data_columns=dc)
store.close()
def _query_table(self, store, columns, where):
""" Selects data from table 'query' and uses where expression
Parameters
----------
store: pd.HDFStore
columns: list
desired data columns
where: str
a valid select expression
Returns
-------
"""
query_cols = self.group_col_names(store, 'query')
if len(query_cols) == 0:
store.close()
raise ValueError('No data to query table')
get_cols = list(set(query_cols) & set(columns))
if len(get_cols) == 0:
# load only one column to minimize memory usage
df_query = store.select('query', columns=query_cols[0],
where=where)
add_query = False
else:
# load columns which are anyways needed already
df_query = store.select('query', columns=get_cols, where=where)
add_query = True
return df_query, add_query
def get(self, columns, where=None):
""" Retrieve data from storage
Parameters
----------
columns: list/str
list of columns to use, or use 'all' if all columns should be
retrieved
where: str
a valid select statement
Returns
-------
pandas.DataFrame
with all requested columns and considering where
"""
store = pd.HDFStore(str(self.path), mode='r')
# get all columns in stored in HDFStorage
stored_cols = self.stored_cols(store)
if columns == 'all':
columns = stored_cols
# check if all desired columns can be found in storage
if len(set(columns) - set(stored_cols)) > 0:
store.close()
raise ValueError('Column(s): . not in storage'.format(
set(columns)- set(stored_cols)))
# get all relevant groups (where columns are taken from)
groups = self._find_groups(columns)
# if where query is defined retrieve data from storage, eventually
# only index of df_query might be used
if where is not None:
df_query, add_df_query = self._query_table(store, columns, where)
else:
df_query, add_df_query = None, False
# dd collector
df = list()
for group in groups:
# skip in case where was used and columns used from
if where is not None and group=='query':
continue
# all columns which are in group but also requested
get_cols = list(
set(self.group_col_names(store, group)) & set(columns))
tmp_df = store.select(group, columns=get_cols)
if df_query is None:
df.append(tmp_df)
else:
# align query index with df index from storage
df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0)
df.append(tmp_df)
store.close()
# if any data of query should be added
if add_df_query:
df.append(df_query)
# combine all columns
df = pd.concat(df, axis=1)
return df
def update(self, df):
""" Updates data in storage, all columns have to be stored already in
order to be accepted for updating (use :meth:`.add_columns` instead)
Parameters
----------
df: pd.DataFrame
with index as in storage, and column as desired
Returns
-------
"""
store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
complib=self.complib)
# check if all column have been stored already
if df.columns.isin(self.stored_cols(store)).all() is False:
store.close()
raise ValueError('Some cols have not been stored yet')
# find all groups needed to store the data
groups = self._find_groups(df.columns)
for group in groups:
dc = None
if group=='query':
dc = True
# load current disk data to memory
group_df = store.get(group)
# remove data from disk
store.remove(group)
# update with new data
group_df.update(df)
# save updated df back to disk
store.append(group, group_df, data_columns=dc)
store.close()
class DataGenerator():
np.random.seed(1282)
@staticmethod
def get_df(rows=100, cols=10, freq='M'):
""" Simulate data frame
"""
if cols < 26:
col_name = list(string.ascii_lowercase[:cols])
else:
col_name = range(cols)
if rows > 2000:
freq = 'Min'
index = pd.date_range('19870825', periods=rows, freq=freq)
df = pd.DataFrame(np.random.standard_normal((rows, cols)),
columns=col_name, index=index)
df.index.name = 'date'
df.columns.name = 'ID'
return df
@staticmethod
def get_panel(rows=1000, cols=500, items=10):
""" simulate panel data
"""
if items < 26:
item_names = list(string.ascii_lowercase[:cols])
else:
item_names = range(cols)
panel_ = dict()
for item in item_names:
panel_[item] = DataGenerator.get_df(rows=rows, cols=cols)
return pd.Panel(panel_)
def main():
# Example of with DataFrame
path = 'D:\\fc_storage.h5'
groups_map = dict(
a=dict(columns=['a', 'b', 'c', 'd', 'k']),
query=dict(columns=['e', 'f', 'g', 'rank_a']),
)
storage = LargeDFStorage(path, groups_map=groups_map)
df = DataGenerator.get_df(rows=200000, cols=15)
storage.add_columns(df[['a', 'b', 'c', 'e', 'f']])
storage.update(df[['a']]*3)
storage.add_columns(df[['d', 'g']])
print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0'))
# Example with panel and rank condition
path2 = 'D:\\panel_storage.h5'
storage_pnl = LargeDFStorage(path2, groups_map=groups_map)
panel = DataGenerator.get_panel(rows=800, cols=2000, items=24)
df = panel.to_frame()
df['rank_a'] = df[['a']].groupby(level='date').rank()
storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']])
storage_pnl.update(df[['a']]*3)
storage_pnl.add_columns(df[['d', 'g', 'rank_a']])
print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'],
where='f>0 & e>0 & rank_a <100'))
if __name__ == '__main__':
main()
【问题讨论】:
【参考方案1】:如果没有具体的例子,回答这些问题有点困难......
更新 HDFTable,我想我必须删除整个表 为了更新单个列?
AFAIK 是的,除非您单独存储单个列,但它会自动完成,您只需将 DF/Panel 写回 HDF 存储。
何时使用“data_columns”,或者我应该简单地在 HDFStore.append()?
data_columns=True
- 将索引 all 您的列 - IMO 这是浪费资源,除非您要在 where 参数中使用 all 列(即,如果所有列都应该被索引)。
我只会在where=
子句中指定那些经常用于搜索的列。将这些列视为数据库表中的索引列。
如果我想根据 rank_var1 > 500 & 的条件进行查询 rank_var1
我认为我们需要一些可重现的样本数据和您的查询示例才能给出合理的答案...
存储的副本可能用于 win/ux 系统。我认为是 完美兼容,这里有什么要考虑的吗?
是的,应该是完全兼容的
我打算使用 pd.HDFStore(str(self.path), mode='a', complevel=9, complib ='blosc')。对 complevel 或 complib 有任何顾虑吗?
使用您的数据对其进行测试 - 结果可能取决于数据类型、唯一值的数量等。您可能还需要考虑lzo
complib - 在某些用例中它可能会更快。检查this。有时较高的complevel
不会给你更好的压缩比,但会更慢(见my old comparison 的结果)
【讨论】:
所以我添加了一个冗长的代码示例(对此感到抱歉)。也许 storage_pnl 示例显示了我在查询排名数据条件时的意思。如果您有兴趣查看.get()
方法的详细信息。与where
一起使用的get()
在内存管理方面可能不是最有效的实现,但它在我的硬件上执行得相当快。以上是关于Pandas + HDF5 Panel 大数据存储的主要内容,如果未能解决你的问题,请参考以下文章
在 HDF5 中存储 Pandas 对象和常规 Python 对象
将 Pandas DataFrames 保存为 HDF5 存储,各种错误
如何将 Pandas DataFrame 存储为 HDF5 PyTables 表(或 CArray、EArray 等)?