Python 中最大回撤的开始、结束和持续时间
Posted
技术标签:
【中文标题】Python 中最大回撤的开始、结束和持续时间【英文标题】:Start, End and Duration of Maximum Drawdown in Python 【发布时间】:2014-05-01 16:29:20 【问题描述】:给定一个时间序列,我想计算最大回撤,我还想定位最大回撤的起点和终点,以便计算持续时间。我想在时间序列图上标记回撤的开始和结束,如下所示:
到目前为止,我已经获得了生成随机时间序列的代码,并且已经获得了计算最大回撤的代码。如果有人知道如何识别回撤开始和结束的地方,我将不胜感激!
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
# create random walk which I want to calculate maximum drawdown for:
T = 50
mu = 0.05
sigma = 0.2
S0 = 20
dt = 0.01
N = round(T/dt)
t = np.linspace(0, T, N)
W = np.random.standard_normal(size = N)
W = np.cumsum(W)*np.sqrt(dt) ### standard brownian motion ###
X = (mu-0.5*sigma**2)*t + sigma*W
S = S0*np.exp(X) ### geometric brownian motion ###
plt.plot(S)
# Max drawdown function
def max_drawdown(X):
mdd = 0
peak = X[0]
for x in X:
if x > peak:
peak = x
dd = (peak - x) / peak
if dd > mdd:
mdd = dd
return mdd
drawSeries = max_drawdown(S)
MaxDD = abs(drawSeries.min()*100)
print MaxDD
plt.show()
【问题讨论】:
您可能对此感兴趣:***.com/questions/21058333/… 这里有一个类似的问题有一个有用的答案(虽然对于熊猫):quant.stackexchange.com/questions/55130/… 【参考方案1】:只要找出运行最大值减去当前值最大的地方:
n = 1000
xs = np.random.randn(n).cumsum()
i = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
j = np.argmax(xs[:i]) # start of period
plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)
【讨论】:
实现最大回撤的真正干净解决方案! 如果你不介意,你能解释一下 i 和 j 的代码吗? For i: np.maximum.accumulate(xs) 为我们提供了累积最大值。取那个和 xs 之间的差并找到它的 argmax 给我们提供了累积回撤最大化的位置。然后对于 j: xs[:i] 获取从周期开始到 i 点的所有点,最大回撤结束。 np.argmax(xs[:i]) 找到图中最高(最大)点的位置/索引,直到该点,这就是我们正在寻找的峰值。 如果没有回撤(所有点都高于前一个),此方法会抛出错误。应检查 i == 0 是否为 true,则回撤也为 0 能否请您展示如何将真实的“日期”添加到此缩编图的 x 轴?到目前为止,代码有效,但仅适用于 numpy 数组。如果时间序列以带有时间戳作为索引的 pandas 系列的方式出现怎么办?换句话说,在情节上显示真实日期会非常好,这样您就可以了解您看待事物的时间范围。谢谢!【参考方案2】:如果这对任何人有帮助,我添加了水下分析...
def drawdowns(equity_curve):
i = np.argmax(np.maximum.accumulate(equity_curve.values) - equity_curve.values) # end of the period
j = np.argmax(equity_curve.values[:i]) # start of period
drawdown=abs(100.0*(equity_curve[i]-equity_curve[j]))
DT=equity_curve.index.values
start_dt=pd.to_datetime(str(DT[j]))
MDD_start=start_dt.strftime ("%Y-%m-%d")
end_dt=pd.to_datetime(str(DT[i]))
MDD_end=end_dt.strftime ("%Y-%m-%d")
NOW=pd.to_datetime(str(DT[-1]))
NOW=NOW.strftime ("%Y-%m-%d")
MDD_duration=np.busday_count(MDD_start, MDD_end)
try:
UW_dt=equity_curve[i:].loc[equity_curve[i:].values>=equity_curve[j]].index.values[0]
UW_dt=pd.to_datetime(str(UW_dt))
UW_dt=UW_dt.strftime ("%Y-%m-%d")
UW_duration=np.busday_count(MDD_end, UW_dt)
except:
UW_dt="0000-00-00"
UW_duration=np.busday_count(MDD_end, NOW)
return MDD_start, MDD_end, MDD_duration, drawdown, UW_dt, UW_duration
【讨论】:
【参考方案3】:behzad.nouri 解决方案非常干净,但它不是最大提取时间(无法发表评论,因为我刚刚开设了我的帐户并且我没有足够的声誉 atm)。
您最终得到的是名义价值的最大下降,而不是价值的相对下降(百分比下降)。例如,如果您将此应用于长期上升的时间序列(例如股票市场指数标准普尔 500 指数),则最近的价值下降(较高的名义价值下降)将优先于较早的价值下降,因为只要名义价值/点的下降幅度更大。
例如标准普尔 500:
2007-08金融危机,下跌56.7%,888.62点 近期冠状病毒危机,下跌 33.9%,1,1148.75 点通过将此方法应用于 2000 年之后的时期,您将看到冠状病毒危机而不是 2007-08 年金融危机
以下相关代码(来自 behzad.nouri):
n = 1000
xs = np.random.randn(n).cumsum()
i = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
j = np.argmax(xs[:i]) # start of period
plt.plot(xs)
plt.plot([i, j], [xs[i], xs[j]], 'o', color='Red', markersize=10)
您只需将该名义值的下降除以最大累积量即可获得相对 (%) 下降。
( np.maximum.accumulate(xs) - xs ) / np.maximum.accumulate(xs)
【讨论】:
【参考方案4】:您的 max_drawdown 已经记录了峰值位置。修改if
,使其在存储mdd时也存储结束位置mdd_end
,以及return mdd, peak, mdd_end
。
【讨论】:
【参考方案5】:我同意 k0rnik。
证明 behzad.nouri 给出的公式可能产生错误结果的简短示例。
xs = [1, 50, 10, 180, 40, 200]
pos_min1 = np.argmax(np.maximum.accumulate(xs) - xs) # end of the period
pos_peak1 = np.argmax(xs[:pos_min1]) # start of period
pos_min2 = np.argmax((np.maximum.accumulate(xs) -
xs)/np.maximum.accumulate(xs)) # end of the period
pos_peak2 = np.argmax(xs[:pos_min2]) # start of period
plt.plot(xs)
plt.plot([pos_min1, pos_peak1], [xs[pos_min1], xs[pos_peak1]], 'o',
label="mdd 1", color='Red', markersize=10)
plt.plot([pos_min2, pos_peak2], [xs[pos_min2], xs[pos_peak2]], 'o',
label="mdd 2", color='Green', markersize=10)
plt.legend()
mdd1 = 100 * (xs[pos_min1] - xs[pos_peak1]) / xs[pos_peak1]
mdd2 = 100 * (xs[pos_min2] - xs[pos_peak2]) / xs[pos_peak2]
print(f"solution 1: peak xs[pos_peak1], min xs[pos_min1]\n rate :
mdd1\n")
print(f"solution 2: peak xs[pos_peak2], min xs[pos_min2]\n rate :
mdd2")
此外,资产的价格不能为负数,所以
xs = np.random.randn(n).cumsum()
不正确。最好添加:
xs -= (np.min(xs) - 10)
【讨论】:
【参考方案6】:这个解决方案已经过测试并且有效,但这里我计算的是最大持续时间回撤,而不是最大回撤的持续时间。该解决方案可以轻松调整以找到最大回撤的持续时间。
def max_dur_drawdown(dfw, threshold=0.05):
"""
Labels all drawdowns larger in absolute value than a threshold and returns the
drawdown of maximum duration (not the max drawdown necessarily but most often they
coincide).
Args:
dfw (pd.DataFrame): monthly data, the pre-computed drawdowns or underwater.
threshold (float): only look at drawdowns greater than this in absolute value e.g. 5%
Returns:
dictionary containing the start, end dates and duration in months for the maximum
duration drawdowns keyed by column name.
"""
max_dur_per_column =
columns = dfw.columns.copy()
mddd_start =
mddd_end =
mddd_duration =
for col in columns:
# run the drawdown labeling algorithm
dfw['sign'] = 0
dfw['sign'].loc[dfw[col] == 0] = +1
dfw['sign'].loc[dfw[col] < 0] = -1
# find the sign change data points
dfw['change'] = dfw['sign'] != dfw['sign'].shift(1)
# the first change doesn't count
dfw['change'].iloc[0] = False
# demarcate the lef and right of the drawdowns
left = dfw[(dfw['change'] == True) & (dfw['sign'] == -1)].index.values
right = dfw[(dfw['change'] == True) & (dfw['sign'] == 1)].index.values
min_len = min(len(left), len(right))
intervals = pd.IntervalIndex.from_arrays(left[0:min_len], right[0:min_len])
# find the minimum value per drawdown interval so we label all data points to the left of it.
min_per_int = list(map(lambda i: (i.left, i.right, dfw[col][(dfw.index >= i.left) & (dfw.index < i.right)].min()), intervals))
# filter out drawdowns lower in absolute value than a threshold
min_per_int = list(filter(None.__ne__, list(map(lambda x: None if x[2] >= -threshold else x, min_per_int))))
# label only the negative part of the underwater NDD stands for negative-side drawdown.
dfw['NDD'] = 0
mddd_start[col] = None
mddd_end[col] = None
mddd_duration[col] = 0
for i in min_per_int:
# find the index of the data point that is minimum this is an argmin
min_idx = dfw[(dfw.index >= i[0]) & (dfw.index < i[1]) & (abs(dfw[col] - i[2]) < 1e-15)].index[0]
# compute the duration and update the maximum duration if needed
tmp_dur = int(np.round((min_idx - i[0]) / np.timedelta64(1, 'M')))
if tmp_dur > mddd_duration[col]:
mddd_start[col] = i[0].date()
mddd_end[col] = min_idx.date()
mddd_duration[col] = tmp_dur
return mddd_start, mddd_end, mddd_duration
示例用法:
# compute cumulative returns
dfc = pd.DataFrame(dfr['S&P500'] / dfr['S&P500'][0])
# compute drawdowns
dfw = dfc / dfc.cummax() - 1
print(max_dur_drawdown(dfw))
【讨论】:
以上是关于Python 中最大回撤的开始、结束和持续时间的主要内容,如果未能解决你的问题,请参考以下文章
Django Query:如何从开始和结束时间字段中找到最长持续时间?
如何找出给定日期和开始和结束时间可用的所有持续时间:java