使用 Pandas 从大型 HDFStore 表中提高查询性能

Posted

技术标签:

【中文标题】使用 Pandas 从大型 HDFStore 表中提高查询性能【英文标题】:Improve Query Performance From a Large HDFStore Table with Pandas 【发布时间】:2014-05-11 17:15:56 【问题描述】:

我有一个大型(约 1.6 亿行)数据框,我已将其存储到磁盘中,如下所示:

    def fillStore(store, tablename):
        files = glob.glob('201312*.csv')
        names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
        for f in files:
            df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
            store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

该表有一个时间索引,除了时间之外,我将使用c_idf_id 进行查询(通过索引)。

我有另一个包含约 18000 个“事件”的数据框。每个事件都包含一些(少至数百,多至数十万)个人记录。我需要为每个事件收集一些简单的统计数据并将它们存储起来,以便收集一些汇总统计数据。目前我这样做:

def makeQueryString(c, f, start, stop):
    return "c_id ==  & f_id ==  & index >= Timestamp('') & index < Timestamp('')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
    incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
    for ind, row in inc_times.iterrows():
        incidents = incidents.append(store.select(tablename, 
                                                  makeQueryString(row.c_id, 
                                                                  row.f_id, 
                                                                  row.start, 
                                                                  row.stop))).fillna(ind)
    return incidents

这一切都很好,除了每个store.select() 语句大约需要 5 秒,这意味着处理整个月的数据需要 24-30 小时的处理时间。同时,我需要的实际统计数据比较简单:

def getIncidentStats(df):
    incLen = (df.index[-1]-df.index[0]).total_seconds()
    if incLen == 0:
        incLen = .1
    rqsts = len(df)
    rqstRate_s = rqsts/incLen
    return pd.Series('c_id':df.c_id[0],
                      'f_id':df.fqdn_id[0],
                      'Length_sec':incLen, 
                      'num_rqsts':rqsts, 
                      'rqst_rate':rqstRate_s, 
                      'avg_resp_size':df.response_len.mean(), 
                      'std_resp_size':df.response_len.std())


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

我的问题是:我如何才能提高此工作流程任何部分的性能或效率?(请注意,我实际上是批量处理大部分工作以获取和存储事件的某一天时间只是因为我想限制在崩溃时丢失已处理数据的风险。为了简单起见,我把这段代码放在这里,因为我实际上需要处理整个月的数据。)

有没有办法在我从商店收到数据时对其进行处理,这有什么好处吗? 我会从使用 store.select_as_index 中受益吗?如果我收到索引,我仍然需要访问数据以获取正确的统计信息吗?

其他注意事项/问题:我比较了将 HDFStore 存储在 SSD 和普通硬盘驱动器上的性能,并没有发现 SSD 有任何改进。这是预期的吗?

我还玩弄了创建大量查询字符串并同时请求它们的想法。当总查询字符串过大(约 5-10 个查询)时,这会导致内存错误。

编辑 1 如果重要的话,我使用的是 3.1.0 版的表格和 0.13.1 版的熊猫

编辑 2 以下是更多信息:

ptdump -av store.h5
/ (RootGroup) ''
  /._v_attrs (AttributeSet), 4 attributes:
   [CLASS := 'GROUP',
    PYTABLES_FORMAT_VERSION := '2.0',
    TITLE := '',
    VERSION := '1.0']
/all_recs (Group) ''
  /all_recs._v_attrs (AttributeSet), 14 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := ['c_id', 'f_id'],
    encoding := None,
    index_cols := [(0, 'index')],
    info := 1: 'type': 'Index', 'names': [None], 'index': 'index_name': 'ts',
    levels := 1,
    nan_rep := 'nan',
    non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
    pandas_type := 'frame_table',
    pandas_version := '0.10.1',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
  description := 
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
  "c_id": Int64Col(shape=(), dflt=0, pos=2),
  "f_id": Int64Col(shape=(), dflt=0, pos=3)
  byteorder := 'little'
  chunkshape := (5461,)
  autoindex := True
  colindexes := 
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False
  /all_recs/table._v_attrs (AttributeSet), 19 attributes:
   [CLASS := 'TABLE',
    FIELD_0_FILL := 0,
    FIELD_0_NAME := 'index',
    FIELD_1_FILL := 0,
    FIELD_1_NAME := 'values_block_0',
    FIELD_2_FILL := 0,
    FIELD_2_NAME := 'c_id',
    FIELD_3_FILL := 0,
    FIELD_3_NAME := 'f_id',
    NROWS := 161738653,
    TITLE := '',
    VERSION := '2.6',
    client_id_dtype := 'int64',
    client_id_kind := ['c_id'],
    fqdn_id_dtype := 'int64',
    fqdn_id_kind := ['f_id'],
    index_kind := 'datetime64',
    values_block_0_dtype := 'int64',
    values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

以下是主表和 inc_times 的示例:

In [12]: df.head()
Out[12]: 
                          c_id        f_id          resp_id      resp_len  \
ts                                                                   
2013-12-04 08:00:00  637092486  5372764353               30      56767543   
2013-12-04 08:00:01  637092486  5399580619               23      61605423   
2013-12-04 08:00:04    5456242  5385485460               21      46742687   
2013-12-04 08:00:04    5456242  5385485460               21      49909681   
2013-12-04 08:00:04  624791800  5373236646               14      70461449   

                              s_id  
ts                           
2013-12-04 08:00:00           1829  
2013-12-04 08:00:01           1724  
2013-12-04 08:00:04           1679  
2013-12-04 08:00:04           1874  
2013-12-04 08:00:04           1727  

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]: 
        c_id     f_id                start                 stop
0       7254   196211  1385880945000000000  1385880960000000000
1       9286   196211  1387259840000000000  1387259850000000000
2      16032   196211  1387743730000000000  1387743735000000000
3      19793   196211  1386208175000000000  1386208200000000000
4      19793   196211  1386211800000000000  1386211810000000000

[5 rows x 4 columns]

关于 c_id 和 f_id,我想从完整存储中选择的 ID 集合与存储中的 ID 总数相比相对较少。换句话说,inc_times中有一些流行的ID,我会重复查询,而完全忽略全表中存在的一些ID。我估计我关心的 ID 大约占总 ID 的 10%,但这些是最受欢迎的 ID,因此它们的记录在整个集合中占主导地位。

我有 16GB 内存。完整存储为 7.4G,完整数据集(作为 csv 文件)仅为 8.7 GB。最初,我相信我能够将整个内容加载到内存中,并且至少可以对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其批处理为每日文件(完整文件由一个月的数据组成)。

【问题讨论】:

你能在 hdf 文件上张贴ptdump -av 您能否发布您存储的数据样本以及inc_times 样本 c_id 和 f_id 的相对频率是多少,它们是相对独特还是很常见,您每次选择的范围有多大(例如时间戳范围) os / 你有多少主内存可用,存储的文件有多大,以 GB 为单位? 您能否为单个选择发布 %prun(例如,使用单个 makeQueryString)。并请在您编辑时发表评论(我会这样收到一条消息)。 【参考方案1】:

这里有一些建议,类似的问题是here

使用压缩:见here。你应该试试这个(这可能会使它更快/更慢,具体取决于你正在查询的内容),YMMV。

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

分块使用分层查询。我的意思是这个。由于您关心的c_idf_id 数量相对较少,因此可以像这样构建一个查询。这有点像使用isin

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
    """ create a list of batches, maxed at maxn """
    batches = []
    while(True):
        if len(l) <= maxn:
            if len(l) > 0:
                batches.append(l)
            break
        batches.append(l[0:maxn])
        l = l[maxn:]
    return batches


results = []
for f_id_batch in create_batches(f_id_list):

    for c_id_batch in create_batches(c_id_list):

        q = "f_id=f_id & c_id=c_id".format(
                f_id=f_id_batch,
                c_id=c_id_batch)

        # you can include the max/min times in here as well (they would be max/min
        # time for ALL the included batches though, maybe easy for you to compute

        result = store.select('df',where=q)

        # sub process this result

        def f(x):
            # you will need to filter out the min/max timestamps here (which I gather
            # are somewhat dependent on f_id/c_id group

            #### process the data and return something
            # you could do something like: ``return x.describe()`` for simple stats

         results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

这里的关键是处理,使isin 的成员不超过 32 个 对于您要查询的任何变量。这是一个内部 numpy/pytables 限制。 如果超过此值,查询将起作用,但它会删除该变量并重新索引 在所有数据上(这不是您想要的)。

这样,您只需几个循环就可以在内存中拥有一个不错的数据子集。这些查询 我认为与您的大多数查询所花费的时间差不多,但您需要的时间会更少。

对于给定的子集,查询时间大致恒定(除非对数据进行排序以使其完全被索引)。

因此,查询会扫描“块”数据(这是索引指向的内容)。如果您在许多块中有很多命中,那么查询会比较慢。

这是一个例子

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]: 
                            A         B         C  c_id  f_id
2013-01-01 00:00:00  0.037287  1.153534  0.639669     8     7
2013-01-01 00:00:01  1.741046  0.459821  0.194282     8     3
2013-01-01 00:00:02 -2.273919 -0.141789  0.770567     1     1
2013-01-01 00:00:03  0.320879 -0.108426 -1.310302     8     6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362     5     5
2013-01-01 00:00:05  1.608211  0.069196  0.025021     3     6
2013-01-01 00:00:06 -0.561690  0.613579  1.071438     8     2
2013-01-01 00:00:07  1.795043 -0.661966  1.210714     0     0
2013-01-01 00:00:08  0.176347 -0.461176  1.624514     3     6
2013-01-01 00:00:09 -1.084537  1.941610 -1.423559     9     1
2013-01-01 00:00:10 -0.101036  0.925010 -0.809951     0     9
2013-01-01 00:00:11 -1.185520  0.968519  2.871983     7     5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014     3     6
2013-01-01 00:00:13  0.544427  0.130439  0.423749     5     7
2013-01-01 00:00:14  0.112216  0.404801 -0.061730     5     4
2013-01-01 00:00:15 -1.349838 -0.639435  0.993495     0     9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

这个特定的例子是 5GB 未压缩和 2.9GB 压缩。这些结果是关于压缩数据的。在这种情况下,使用未压缩的实际上要快得多(例如,第一个循环需要 3.5 秒)。这是 100MM 行。

因此,使用最后一个示例 (4),您将在 3 倍多一点的查询时间中获得 9 倍于第一个示例的数据。

但是您的加速应该更多,因为您不会选择单个时间戳,而是稍后再做。

整个方法考虑到您有足够的主内存来保存批量大小的结果(例如,您在批量查询中选择了集合中相对较小的部分)。

【讨论】:

这个解决方案与我最终所做的非常相似(以及我未来工作的方向)。主要见解是限制对存储的查询数量,并使用每个选择来选择一个块,虽然很大,但仍然适合内存。在我的例子中,简单地将时间索引拆分为内存操作可以将总处理时间减少大约一半。将多个 c_id 和 f_id 批处理在一起可以进一步缩短时间。

以上是关于使用 Pandas 从大型 HDFStore 表中提高查询性能的主要内容,如果未能解决你的问题,请参考以下文章

pandas.HDFStore:如何修改现有商店的“data_columns”?我想为不在数据列中的列添加索引

使用 Pandas HDFStore 以只读模式打开文件

获取 HDF5 内容列表(Pandas HDFStore)

Pandas HDFStore:省略重复项

Python pandas 'HDFStore requires PyTables' Issue

Python Pandas hdfstore 的 select(where='') 返回不合格的结果