许多数据帧上的高效 Python Pandas 股票 Beta 计算

Posted

技术标签:

【中文标题】许多数据帧上的高效 Python Pandas 股票 Beta 计算【英文标题】:Efficient Python Pandas Stock Beta Calculation on Many Dataframes 【发布时间】:2017-01-22 21:35:21 【问题描述】:

我有许多 (4000+) 个 CSV 的股票数据(日期、开盘价、最高价、最低价、收盘价),我将它们导入到单个 Pandas 数据框中进行分析。我是 python 新手,想为每只股票计算 12 个月的滚动 beta,我找到了一个计算滚动 beta 的帖子 (Python pandas calculate rolling stock beta using rolling apply to groupby object in vectorized fashion) 但是在下面的代码中使用时需要超过 2.5 小时!考虑到我可以在 3 分钟内在 SQL 表中运行完全相同的计算,这太慢了。

如何提高以下代码的性能以匹配 SQL 的性能?我了解 Pandas/python 具有这种能力。我当前的方法循环遍历每一行,我知道这会降低性能,但我不知道有任何聚合方式可以在数据帧上执行滚动窗口 beta 计算。

注意:将 CSV 加载到单个数据框并计算每日回报的前 2 个步骤仅需约 20 秒。我所有的 CSV 数据帧都存储在名为“FilesLoaded”的字典中,名称为“XAO”。

您的帮助将不胜感激! 谢谢你:)

import pandas as pd, numpy as np
import datetime
import ntpath
pd.set_option('precision',10)  #Set the Decimal Point precision to DISPLAY
start_time=datetime.datetime.now()

MarketIndex = 'XAO'
period = 250
MinBetaPeriod = period
# ***********************************************************************************************
# CALC RETURNS 
# ***********************************************************************************************
for File in FilesLoaded:
    FilesLoaded[File]['Return'] = FilesLoaded[File]['Close'].pct_change()
# ***********************************************************************************************
# CALC BETA
# ***********************************************************************************************
def calc_beta(df):
    np_array = df.values
    m = np_array[:,0] # market returns are column zero from numpy array
    s = np_array[:,1] # stock returns are column one from numpy array
    covariance = np.cov(s,m) # Calculate covariance between stock and market
    beta = covariance[0,1]/covariance[1,1]
    return beta

#Build Custom "Rolling_Apply" function
def rolling_apply(df, period, func, min_periods=None):
    if min_periods is None:
        min_periods = period
    result = pd.Series(np.nan, index=df.index)
    for i in range(1, len(df)+1):
        sub_df = df.iloc[max(i-period, 0):i,:]
        if len(sub_df) >= min_periods:  
            idx = sub_df.index[-1]
            result[idx] = func(sub_df)
    return result

#Create empty BETA dataframe with same index as RETURNS dataframe
df_join = pd.DataFrame(index=FilesLoaded[MarketIndex].index)    
df_join['market'] = FilesLoaded[MarketIndex]['Return']
df_join['stock'] = np.nan

for File in FilesLoaded:
    df_join['stock'].update(FilesLoaded[File]['Return'])
    df_join  = df_join.replace(np.inf, np.nan) #get rid of infinite values "inf" (SQL won't take "Inf")
    df_join  = df_join.replace(-np.inf, np.nan)#get rid of infinite values "inf" (SQL won't take "Inf")
    df_join  = df_join.fillna(0) #get rid of the NaNs in the return data
    FilesLoaded[File]['Beta'] = rolling_apply(df_join[['market','stock']], period, calc_beta, min_periods = MinBetaPeriod)

# ***********************************************************************************************
# CLEAN-UP
# ***********************************************************************************************
print('Run-time: 0'.format(datetime.datetime.now() - start_time))

【问题讨论】:

【参考方案1】:

基于numpy和pandas创建了一个简单的python包finance-calculator来计算包括beta在内的财务比率。我正在使用简单的公式(as per investopedia):

beta = covariance(returns, benchmark returns) / variance(benchmark returns)

在 pandas 中直接计算协方差和方差,这使其速度更快。使用包中的api也很简单:

import finance_calculator as fc
beta = fc.get_beta(scheme_data, benchmark_data, tail=False)

如果 tail 为真,它将为您提供日期和 beta 或最后一个 beta 值的数据框。

【讨论】:

【参考方案2】:

这是最简单、最快的解决方案

接受的答案对于我需要的东西来说太慢了,而且我不理解声称更快的解决方案背后的数学。他们也给出了不同的答案,但公平地说,我可能只是搞砸了。

我认为您不需要创建自定义滚动函数来使用 pandas 1.1.4(甚至至少从 0.19 开始)计算 beta。下面的代码假设数据的格式与上述问题相同——带有日期索引的 pandas 数据框、股票某些周期性的百分比回报率以及市场价值位于名为“市场”的列中。

如果你没有这种格式,我建议加入股票回报到市场回报,以确保与指数相同:

# Use .pct_change() only if joining Close data
beta_data = stock_data.join(market_data), how = 'inner').pct_change().dropna()

之后,就是协方差除以方差。


ticker_covariance = beta_data.rolling(window).cov()
# Limit results to the stock (i.e. column name for the stock) vs. 'Market' covariance
ticker_covariance = ticker_covariance.loc[pd.IndexSlice[:, stock], 'Market'].dropna()
benchmark_variance = beta_data['Market'].rolling(window).var().dropna()
beta = ticker_covariance / benchmark_variance

注意:如果您有一个多索引,则必须删除非日期级别才能使用 rolling().apply() 解决方案。我只对一只股票和一个市场进行了测试。如果您有多个股票,则可能需要在 .loc 之后修改ticker_covariance 方程。最后,如果您想计算整个窗口之前时期的 beta 值(例如,stock_data 始于 1 年前,但您使用的是 3 年的数据),那么您可以将上面的内容修改为并扩展(而不是滚动)窗口同样的计算,然后 .combine_first() 两者。

【讨论】:

【参考方案3】:

进一步优化@piRSquared 的速度和内存实现。为了清楚起见,代码也被简化了。

from numpy import nan, ndarray, ones_like, vstack, random
from numpy.lib.stride_tricks import as_strided
from numpy.linalg import pinv
from pandas import DataFrame, date_range

def calc_beta(s: ndarray, m: ndarray):
  x = vstack((ones_like(m), m))
  b = pinv(x.dot(x.T)).dot(x).dot(s)
  return b[1]

def rolling_calc_beta(s_df: DataFrame, m_df: DataFrame, period: int):
  result = ndarray(shape=s_df.shape, dtype=float)
  l, w = s_df.shape
  ls, ws = s_df.values.strides
  result[0:period - 1, :] = nan
  s_arr = as_strided(s_df.values, shape=(l - period + 1, period, w), strides=(ls, ls, ws))
  m_arr = as_strided(m_df.values, shape=(l - period + 1, period), strides=(ls, ls))
  for row in range(period, l):
    result[row, :] = calc_beta(s_arr[row - period, :], m_arr[row - period])
  return DataFrame(data=result, index=s_df.index, columns=s_df.columns)

if __name__ == '__main__':
  num_sec_dfs, num_periods = 4000, 480

  dates = date_range('1995-12-31', periods=num_periods, freq='M', name='Date')
  stocks = DataFrame(data=random.rand(num_periods, num_sec_dfs), index=dates,
                   columns=['s:04d'.format(i) for i in 
                            range(num_sec_dfs)]).pct_change()
  market = DataFrame(data=random.rand(num_periods), index=dates, columns= 
              ['Market']).pct_change()
  betas = rolling_calc_beta(stocks, market, 12)

%timeit betas = rolling_calc_beta(stocks, market, 12)

每个循环 335 毫秒 ± 2.69 毫秒(7 次运行的平均值 ± 标准偏差,每次 1 个循环)

【讨论】:

【参考方案4】:

虽然将输入数据集有效细分为​​滚动窗口对于优化整体计算很重要,但也可以显着提高 beta 计算本身的性能。

以下仅优化将数据集细分为滚动窗口:

def numpy_betas(x_name, window, returns_data, intercept=True):
    if intercept:
        ones = numpy.ones(window)

    def lstsq_beta(window_data):
        x_data = numpy.vstack([window_data[x_name], ones]).T if intercept else window_data[[x_name]]
        beta_arr, residuals, rank, s = numpy.linalg.lstsq(x_data, window_data)
        return beta_arr[0]

    indices = [int(x) for x in numpy.arange(0, returns_data.shape[0] - window + 1, 1)]
    return DataFrame(
        data=[lstsq_beta(returns_data.iloc[i:(i + window)]) for i in indices]
        , columns=list(returns_data.columns)
        , index=returns_data.index[window - 1::1]
    )

以下还优化了 beta 计算本身:

def custom_betas(x_name, window, returns_data):
    window_inv = 1.0 / window
    x_sum = returns_data[x_name].rolling(window, min_periods=window).sum()
    y_sum = returns_data.rolling(window, min_periods=window).sum()
    xy_sum = returns_data.mul(returns_data[x_name], axis=0).rolling(window, min_periods=window).sum()
    xx_sum = numpy.square(returns_data[x_name]).rolling(window, min_periods=window).sum()
    xy_cov = xy_sum - window_inv * y_sum.mul(x_sum, axis=0)
    x_var = xx_sum - window_inv * numpy.square(x_sum)
    betas = xy_cov.divide(x_var, axis=0)[window - 1:]
    betas.columns.name = None
    return betas

比较两种不同计算的性能,您可以看到随着 beta 计算中使用的窗口增加,第二种方法的性能大大优于第一种:

将性能与@piRSquared 的实现进行比较,自定义方法的评估时间大约为 350 毫秒,而评估时间超过 2 秒。

【讨论】:

【参考方案5】:

但当您需要跨日期 (m) 为多个股票 (n) 计算结果 (m x n) 数量时进行 beta 计算,这些将是块状的。

在多个内核上运行每个日期或库存可以减轻一些负担,但最终您将拥有庞大的硬件。

可用解决方案的主要时间要求是找到方差和协方差,并且在(指数和股票)数据中应避免使用 NaN,以便按照 pandas==0.23 进行正确计算。 0.

除非计算被缓存,否则再次运行将导致愚蠢的举动。

如果 NaN 未删除,则 numpy 方差和协方差版本也会发生错误计算 beta。

大量数据必须使用 Cython 实现。

【讨论】:

在我看来,这应该作为问题的 cmets 发布,因为它并不是一个真正连贯的答案,只是一系列评论。如果您还没有评论特权,那么我建议您去赚取获得此特权所需的几个声望点。【参考方案6】:

生成随机股票数据 4,000 只股票的 20 年月度数据

dates = pd.date_range('1995-12-31', periods=480, freq='M', name='Date')
stoks = pd.Index(['s:04d'.format(i) for i in range(4000)])
df = pd.DataFrame(np.random.rand(480, 4000), dates, stoks)

df.iloc[:5, :5]


滚动功能 返回 groupby 对象准备应用自定义函数 见Source

def roll(df, w):
    # stack df.values w-times shifted once at each stack
    roll_array = np.dstack([df.values[i:i+w, :] for i in range(len(df.index) - w + 1)]).T
    # roll_array is now a 3-D array and can be read into
    # a pandas panel object
    panel = pd.Panel(roll_array, 
                     items=df.index[w-1:],
                     major_axis=df.columns,
                     minor_axis=pd.Index(range(w), name='roll'))
    # convert to dataframe and pivot + groupby
    # is now ready for any action normally performed
    # on a groupby object
    return panel.to_frame().unstack().T.groupby(level=0)

Beta 函数 使用 OLS 回归的封闭形式解决方案 假设第 0 列是市场 见Source

def beta(df):
    # first column is the market
    X = df.values[:, [0]]
    # prepend a column of ones for the intercept
    X = np.concatenate([np.ones_like(X), X], axis=1)
    # matrix algebra
    b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(df.values[:, 1:])
    return pd.Series(b[1], df.columns[1:], name='Beta')

演示

rdf = roll(df, 12)
betas = rdf.apply(beta)

时机


验证 与 OP 比较计算

def calc_beta(df):
    np_array = df.values
    m = np_array[:,0] # market returns are column zero from numpy array
    s = np_array[:,1] # stock returns are column one from numpy array
    covariance = np.cov(s,m) # Calculate covariance between stock and market
    beta = covariance[0,1]/covariance[1,1]
    return beta

print(calc_beta(df.iloc[:12, :2]))

-0.311757542437

print(beta(df.iloc[:12, :2]))

s0001   -0.311758
Name: Beta, dtype: float64

注意第一个单元格 是否与上述经过验证的计算值相同

betas = rdf.apply(beta)
betas.iloc[:5, :5]


对评论的回应 具有模拟多个数据框的完整工作示例

num_sec_dfs = 4000

cols = ['Open', 'High', 'Low', 'Close']
dfs = 's:04d'.format(i): pd.DataFrame(np.random.rand(480, 4), dates, cols) for i in range(num_sec_dfs)

market = pd.Series(np.random.rand(480), dates, name='Market')

df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1)

betas = roll(df.pct_change().dropna(), 12).apply(beta)

for c, col in betas.iteritems():
    dfs[c]['Beta'] = col

dfs['s0001'].head(20)

【讨论】:

感谢您的详细回复!然而,每个数据帧 2.3 秒大约是我现在获得的速度(x4000 = 很长一段时间)。有更快的方法吗?如前所述,我可以在 3 分钟内使用 SQL 表完成所有 4000 多个操作。 @cwse 2.3 秒代表 240 个月和 4,000 只股票。 SQL 可以在 3 分钟内完成。 pandas + numpy 可以在 3 秒内完成 @cwse 我刚刚在 240 个月内模拟了 40,000 个股票回报,在 40 秒内运行了 12 个月的滚动测试版。 哇!!真是史诗级的!谢谢队友:) 这看起来非常适合我需要做的事情,我想知道您是否能够使用 DataFrames 多级索引发布更新的解决方案,因为 Panel 已经折旧。谢谢:)【参考方案7】:

使用生成器提高内存效率

模拟数据

m, n = 480, 10000
dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date')
stocks = pd.Index(['s:04d'.format(i) for i in range(n)])
df = pd.DataFrame(np.random.rand(m, n), dates, stocks)
market = pd.Series(np.random.rand(m), dates, name='Market')
df = pd.concat([df, market], axis=1)

Beta 计算

def beta(df, market=None):
    # If the market values are not passed,
    # I'll assume they are located in a column
    # named 'Market'.  If not, this will fail.
    if market is None:
        market = df['Market']
        df = df.drop('Market', axis=1)
    X = market.values.reshape(-1, 1)
    X = np.concatenate([np.ones_like(X), X], axis=1)
    b = np.linalg.pinv(X.T.dot(X)).dot(X.T).dot(df.values)
    return pd.Series(b[1], df.columns, name=df.index[-1])

滚动功能 这会返回一个生成器,并且内存效率会更高

def roll(df, w):
    for i in range(df.shape[0] - w + 1):
        yield pd.DataFrame(df.values[i:i+w, :], df.index[i:i+w], df.columns)

把它们放在一起

betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T

验证

OP beta 计算

def calc_beta(df):
    np_array = df.values
    m = np_array[:,0] # market returns are column zero from numpy array
    s = np_array[:,1] # stock returns are column one from numpy array
    covariance = np.cov(s,m) # Calculate covariance between stock and market
    beta = covariance[0,1]/covariance[1,1]
    return beta

实验设置

m, n = 12, 2
dates = pd.date_range('1995-12-31', periods=m, freq='M', name='Date')

cols = ['Open', 'High', 'Low', 'Close']
dfs = 's:04d'.format(i): pd.DataFrame(np.random.rand(m, 4), dates, cols) for i in range(n)

market = pd.Series(np.random.rand(m), dates, name='Market')

df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1)

betas = pd.concat([beta(sdf) for sdf in roll(df.pct_change().dropna(), 12)], axis=1).T

for c, col in betas.iteritems():
    dfs[c]['Beta'] = col

dfs['s0000'].head(20)

calc_beta(df[['Market', 's0000']])

0.0020118230147777435

注意: 计算是一样的

【讨论】:

谢谢..这似乎已经到了那里但是答案不完整?这一切都运行了,但我没有得到肯定的 beta 值……我得到了小数,但它们不正确。滚动功能中不再有 3D 阵列和面板? 内存效率问题让我思考。这个 beta calc 是一样的,但是 roll 是完全不同的。另一个答案中的面板是一种巧妙的方式,可以将所有内容组合成一个可以反复计算的通用结构。使用生成器仅使用它在 beta 计算时所需的数据帧切片,然后继续。无论如何,我会 dbl chk 计算结果。 我最近遇到以下问题:“df = pd.concat([market] + [dfs[k].Close.rename(k) for k in dfs.keys()], axis=1).sort_index(1)" 在 Python 3.4 中...似乎对 dict 键的迭代不适用于重命名函数...我收到此错误:TypeError: 'str' object is not callable (Python)我尝试了谷歌搜索解决方案并尝试了不同的迭代方法..list()、iter()、items().. over the dict并且无法让它工作!!我希望你能帮忙!谢谢!! @cwse "df = pd.concat([market] + [dfs[k].Close.rename(k) for k, v in dfs.items()],axis=1)。 sort_index(1)" @cwse "df = pd.concat([market] + [v.Close.rename(k) for k, v in dfs.items()], axis=1).sort_index(1 )"

以上是关于许多数据帧上的高效 Python Pandas 股票 Beta 计算的主要内容,如果未能解决你的问题,请参考以下文章

在数据帧上的 pandas groupby 之后循环遍历组

在 Python Pandas 的数据帧上使用字符串方法?

使用 pandas 在数据帧上执行 groupby,按计数排序并获取 python 中的前 2 个计数

数据帧上的 spark GROUPED_MAP udf 是不是并行运行?

大型python数据帧上的快速数据处理

python数据帧上的平均值和最大值