Python Pandas 改进了目前需要约 400 分钟运行的大型数据集的计算时间

Posted

技术标签:

【中文标题】Python Pandas 改进了目前需要约 400 分钟运行的大型数据集的计算时间【英文标题】:Python Pandas improving calculation time for large datasets currently taking ~400 mins to run 【发布时间】:2021-03-10 09:59:42 【问题描述】:

我正在尝试提高我每天需要构建的 DataFrame 的性能,我想知道是否有人有一些想法。我在下面创建了一个简单的示例:

首先,我有一个像这样的DataFramedict。这是时间序列数据,因此每天都会更新。

import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats

dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]

m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j  for i in m_list for j in m_list]

keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]

df_dict = dict(zip(keys, values))    #this is my time series data

接下来我有三个列表:

#I will build a dict of DataFrames calc attributes for these combos for each df in dict_df above
combos = ['/'.format(*np.random.choice(names, 2)) for i in range(750)] + ['//'.format(*np.random.choice(names, 3)) for i in range(1500)]

periods = [20, 60, 100, 200, 500, 1000, 2000]   #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25)       #benchmarks to compare combos to

然后这里是我构建我需要的数据帧的地方:

def calc_beta (a_series, b_series) :

    covariance = np.cov (a_series, b_series)     
    beta = covariance[0, 1] / covariance[1, 1]    
    
    return beta

data_dict = 

for i in list(df_dict.keys()) :
    
    attr_list = []
    
    df = df_dict[i]
    
    for c in combos :
        
        c_split = c.split('/')
        combo_list = []
        for cs in c_split :
            _list = [int(x) for x in list(filter(None, cs.split('m')))]
            combo_list.append(_list)
        if len(combo_list) == 2 :
            combo_list.append([np.nan, np.nan])
        
        c1a, c1b, c2a, c2b, c3a, c3b = [item for subl in combo_list for item in subl]
        
        if len(c_split) == 2 :
            l1, l2 = c_split
            _series = df[l1] - df[l2]
            
        if len(c_split) == 3 :
            l1, l2, l3 = c_split
            _series = df[l1] - df[l2] - df[l3]
        
        attr = 
            
            'name' : c,
            'a' : c1a,
            'b' : c1b,
            'c' : c2a,
            'd' : c2b,
            'e' : c3a,
            'f' : c3b,
            'series' : _series,
            'last' : _series[-1]
        
        
        for p in periods :
            _str = str(p)
            p_series = _series[-p:]
            
            attr['quantile' + _str] = stats.percentileofscore(p_series, attr['last'])
            attr['z_score' + _str] = stats.zscore(p_series)[-1]
            attr['std' + _str] = np.std(p_series)            
            attr['range' + _str] = max(p_series) - min(p_series)
            attr['last_range' + _str] = attr['last'] / attr['range' + _str]
            attr['last_std' + _str] = attr['last'] / attr['std' + _str]        
            
            if p > 100 :
                attr['5d_autocorr' + _str] = p_series.autocorr(-5)
            else :
                attr['5d_autocorr' + _str] = np.nan
                
            for b in benchmarks :
                b_split = b.split('/')
                
                if len(b_split) == 1 :
                    b_series = df[b_split[0]]
                    
                elif len(b_split) == 2 :                    
                    b_series = df[b_split[0]] - df[b_split[1]]  
                
                elif len(b_split) == 3 :                    
                    b_series = df[b_split[0]] - df[b_split[1]] - df[b_split[2]]  
                
                b_series = b_series[-p:]
                
                corr_value = p_series.corr(b_series)
                
                beta_value = calc_beta (p_series, b_series)
                
                corr_ticker = '_corr'.format(b, _str)
                beta_ticker = '_beta'.format(b, _str)
                
                attr[corr_ticker] = corr_value    
                attr[beta_ticker] = corr_value    
        
                if p > 500 :
                    attr[b + '_20rolling_corr_mean' + _str] = p_series.rolling(20).corr(b_series).mean()

                    df1 = pd.DataFrame(c : p_series, b : b_series)

                    attr[b + '_20d_rolling_beta_mean' + _str] =  df1.rolling(20) \
                                                                    .cov(df1 , pairwise=True) \
                                                                    .drop([c], axis=1) \
                                                                    .unstack(1) \
                                                                    .droplevel(0, axis=1) \
                                                                    .apply(lambda row: row[c] / row[b], axis=1) \
                                                                    .mean()
        
        attr_list.append(attr)
    
    data_dict[i] = pd.DataFrame(attr_list)

这是实际数据的通用示例,但它几乎完全复制了我正在尝试做的每一种计算,尽管我减少了数字以使其更简单。

最后一部分在字典中每个 DataFrame 大约需要 40 分钟,即这个数据集总共需要 400 分钟。

我过去没有使用过大型数据集,据我了解,我需要最小化我拥有的 For 循环和 Apply 函数,但我还应该做什么?感谢您的任何意见。

谢谢

【问题讨论】:

您能否缩小范围,或将问题拆分为明确的部分(描述您在每个部分中所做的事情)。目前所说的问题需要大量时间来理解,不确定您是否可以找到专门为您提供帮助的人...... 知道了,我试图平衡一个现实问题与简化问题,但也许我的平衡错了。您可以将代码放在第一个 for 循环存在的地方,然后删除每个循环中的所有计算,除了一个,这样您就可以看到代码的结构但不一定看到每个计算? calc_beta 函数在哪里?如果我只是剪切/粘贴您的代码,beta_value = calc_beta (p_series, b_series) 会引发错误。 @JonathanLeon 很好,很抱歉。现在刚刚添加。 你考虑过Dask吗?它会让你的 CPU 的所有核心都参与其中。 【参考方案1】:

所以,我去了一个黑暗的地方想办法在这里提供帮助:)

总而言之,它是脚本末尾的两个函数,其中 p>500 正在杀死你。当 p

我没有从本质上通过组合进行迭代并填写您的数据框,而是采用了从具有所有组合的数据框开始的方法(在上面的示例中,2500 行)。然后向右工作并尽可能矢量化。我认为这里有很多改进之处,但我无法让它像我想要的那样正常工作,所以也许其他人可以提供帮助。

这是我最终得到的代码。它在您输入问题后开始。

import pandas as pd
import numpy as np
import datetime as dt
from scipy import stats
import time

def calc_beta (a_series, b_series) :
    covariance = np.cov (a_series, b_series)
    beta = covariance[0, 1] / covariance[1, 1]
    return beta

dates = [dt.datetime.today().date() - dt.timedelta(days=x) for x in range(2000)]

m_list = [str(i) + 'm' for i in range(0, 15)]
names = [i + j  for i in m_list for j in m_list]

#keys = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J']
keys = ['A']
values = [pd.DataFrame([np.random.rand(225) for x in range(0, 2000)], index=dates, columns=names) for i in range(0, 10)]

df_dict = dict(zip(keys, values))

combos = ['/'.format(*np.random.choice(names, 2)) for i in range(750)] + ['//'.format(*np.random.choice(names, 3)) for i in range(1500)]

#periods = [20, 60, 100, 200, 500, 1000, 2000]   #num of datapoints to use from time series
periods = [20]   #num of datapoints to use from time series
benchmarks = np.random.choice(combos, 25)       #benchmarks to compare combos to

data_dict = 

for i in list(df_dict.keys()):
    df = df_dict[i]
    mydf =  pd.DataFrame(combos, columns=['name'])
    mydf[['a','b','c','d','e','f']]=mydf.name.str.replace('/', '').str.replace('m', ',').str[0:-1].str.split(',', expand=True)

    def get_series(a):
        if len(a) == 2 :
            l1, l2 = a
            s = df[l1] - df[l2]
            return s.tolist()
        else:
            l1, l2, l3 = a
            s = df[l1] - df[l2] - df[l3]
            return s.tolist()

    mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))
    mydf['last'] = mydf['series'].str[-1]

    for p in periods:
        _str = str(p)

        mydf['quantile' + _str] =  mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)
        mydf['z_score' + _str] = mydf.apply(lambda x: stats.zscore(x['series'][-p:])[-1], axis=1)
        mydf['std' + _str] = mydf.apply(lambda x: np.std(x['series'][-p:]), axis=1)
        mydf['range' + _str] = mydf.apply(lambda x: max(x['series'][-p:]) - min(x['series'][-p:]), axis=1)
        mydf['last_range' + _str] = mydf['last'] / mydf['range' + _str]
        mydf['last_std' + _str] = mydf['last'] / mydf['std' + _str]

        if p > 100 :
            mydf['5d_autocorr' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).autocorr(-5), axis=1)
        else :
            mydf['5d_autocorr' + _str] = np.nan

        def get_series(a):
            if len(a) == 1 :
                b = df[a[0]]
                return b.tolist()
            elif len(a) == 2 :
                b = df[a[0]] - df[a[1]]
                return b.tolist()
            else:
                b = df[a[0]] - df[a[1]] - df[a[2]]
                return b.tolist()

        for b in benchmarks:
            corr_ticker = '_corr'.format(b, _str)
            beta_ticker = '_beta'.format(b, _str)

            b_series = get_series(b.split('/'))[-p:]

            mydf[corr_ticker] = mydf.apply(lambda x: stats.pearsonr(np.array(x['series'][-p:]), np.array(b_series))[0], axis=1)
            mydf[beta_ticker] = mydf.apply(lambda x: calc_beta(np.array(x['series'][-p:]), np.array(b_series)), axis=1)

            if p > 500 :
                mydf[b + '_20rolling_corr_mean' + _str] = mydf.apply(lambda x: pd.Series(x['series'][-p:]).rolling(20).corr(pd.Series(b_series)).mean(), axis=1)
                mydf[b + '_20d_rolling_beta_mean' + _str] =  mydf.apply(lambda x: pd.DataFrame(x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)).rolling(20) \
                                                                .cov(pd.DataFrame(x['name']: pd.Series(x['series'][-p:]), b : pd.Series(b_series)) , pairwise=True) \
                                                                .drop([x['name']], axis=1) \
                                                                .unstack(1) \
                                                                .droplevel(0, axis=1) \
                                                                .apply(lambda row: row[x['name']] / row[b], axis=1) \
                                                                .mean(), axis=1)

    data_dict[i] = mydf

我只运行了一组“A”并更改了周期。通过保持“A”不变并改变周期,我得到了这里显示的性能提升。在 period = 400 时,我的性能仍然提高了 60%。

A 20
Original: Total Time 25.74614143371582
Revised: Total Time 7.026344299316406

A 200
Original: Total Time 25.56810474395752
Revised: Total Time 10.015231847763062

A 400
Original: Total Time 28.221587419509888
Revised: Total Time 11.064109802246094

转到周期 501,您的原始代码耗时 1121.6251230239868 秒。我的差不多。从 400 到 501 会增加两个函数的大量时间(在每个基准测试中重复)。

如果您需要这些函数并且必须在分析时计算它们,您应该将时间花在这两个函数上。我发现使用 pandas 系列很慢,您会注意到我在一个实例中使用 scipy 模块进行关联,因为收益是值得的。如果您可以直接使用 numpy 或 scipy 模块来执行最后两个函数,那么您也会在那里看到收益。

另一个值得关注的地方是我使用 lambda 函数的地方。这仍然是逐行的,就像使用 for 循环一样。我正在保存期间系列,因此我可以使用以下计算:

def get_series(a):
    if len(a) == 2 :
        l1, l2 = a
        s = df[l1] - df[l2]
        return s.tolist()
    else:
        l1, l2, l3 = a
        s = df[l1] - df[l2] - df[l3]
        return s.tolist()

mydf['series'] = mydf['name'].apply(lambda x: get_series(x.split('/')))

这个系列由列表组成,并被传递到 lambda 函数中。我希望找到一种方法来通过同时计算所有行来对其进行矢量化,但是有些函数需要序列,有些使用列表,我只是可以弄清楚。这是一个例子:

mydf['quantile' + _str] =  mydf.apply(lambda x: stats.percentileofscore(x['series'][-p:], x['last']), axis=1)

如果您能弄清楚如何对其进行矢量化,然后将其应用于那些 p>500 的函数,您将会看到一些节省。

最后,你的代码还是我的代码,真正的问题是最后两个函数。其他一切都更小,但真实,节省并加起来,但重新考虑最后一块可以节省你的一天。

另一种选择是多进程或将其分解到多台机器上

【讨论】:

非常感谢。这么多非常有见地的小变化,今天仍在实施其中的一些,但它有很大帮助。谢谢【参考方案2】:

我在最里面的循环中更改了一行,这为包含 2,000 行的数据帧提供了 1.6 倍的加速。

这不会解决所有问题,但可能会有所帮助。

for b in benchmarks:
    ...
    
    if p > 500:
        attr[b + '_20d_rolling_beta_mean' + _str] =  (
            df1
            .rolling(5)  
            .cov(df1, pairwise=True)
            .drop([c], axis=1)
            .unstack(1)
            .droplevel(0, axis=1)
            # .apply(lambda row: row[c] / row[b], axis=1)                 # <-- removed
            .assign(result = lambda x: x[c] / x[b]).iloc[:, -1].squeeze() # <-- added
            .mean()
        )

A 中前 100 个连击的粗略计时信息(经过的时间):

应用语句:142.6 秒 assign 语句:90.1 秒

【讨论】:

哇,切换到分配是一个巨大的改进。谢谢 你看过Python内置的多处理docs吗?我认为键 A、B、...、J 是独立的。如果您可以将最后一个 for 循环编写为函数,则可以通过使用多个内核来减少总计算时间。 (你也需要这个函数来使用 Dask。) 谢谢。我已经阅读了 Dask 作为一种可能的解决方案。最初看它,我不确定我是否有实现它的技能,但会做更多的阅读,因为你是对的,AJ Keys 是独立的,所以异步解决它们可能是解决方案中非常方便的部分。

以上是关于Python Pandas 改进了目前需要约 400 分钟运行的大型数据集的计算时间的主要内容,如果未能解决你的问题,请参考以下文章

Pandas 函数方法汇总一览查询(持续补充改进)

Pandas 函数方法汇总一览查询(持续补充改进)

传递需要的大量 Json 对象时如何改进 Asp.net Mvc 应用程序?

Python/Pandas - 性能改进 - 将列分成多个部分并将字符串序列转换为列表

与 IPv4 相比,IPv6 都有哪些改进?

Python Pandas 用顶行替换标题