如何使用 python + NumPy / SciPy 计算滚动/移动平均值?
Posted
技术标签:
【中文标题】如何使用 python + NumPy / SciPy 计算滚动/移动平均值?【英文标题】:How to calculate rolling / moving average using python + NumPy / SciPy? 【发布时间】:2012-12-28 02:55:09 【问题描述】:似乎没有函数可以简单地计算 numpy/scipy 上的移动平均值,导致 convoluted solutions。
我的问题有两个:
使用 numpy(正确)实现移动平均线的最简单方法是什么? 既然这看起来不简单且容易出错,那么在这种情况下是否有充分的理由不使用 batteries included?【问题讨论】:
卷积解决方案对我来说似乎并不复杂! 移动平均线不只是一个低通滤波器(即“模糊”)吗?很确定这正是卷积的用途...... @mmgp 我想我希望是错的,或者有一个很好的、明显的原因。 @wim 这是一个双关语。但这个问题存在的事实意味着从 numpy.convolute 创建移动平均线并不简单。 Moving average or running mean的可能重复 【参考方案1】:如果您只想要一个简单的非加权移动平均线,您可以使用 编辑更正了 Bean 在代码中发现的错误索引。 编辑 所以我猜答案是:它真的很容易实现,也许 numpy 已经有点臃肿了,具有专门的功能。np.cumsum
轻松实现它,这可能 比基于 FFT 的方法快:
def moving_average(a, n=3) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
>>> a = np.arange(20)
>>> moving_average(a)
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.,
12., 13., 14., 15., 16., 17., 18.])
>>> moving_average(a, n=4)
array([ 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5,
10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5])
【讨论】:
此代码不正确。例如Moving_average([1,2,5,10], n=2) 给出 [ 1. , 3.5, 8.5]。甚至回答者对 0 到 19 的移动平均值的测试用例也不正确,声称 0、1 和 2 的平均值是 0.5。它是如何获得 6 票的? 感谢您的错误检查,它现在似乎工作正常。至于赞成票,我猜答案背后的一般想法比实施中的一个错误更重要,但谁知道呢。 我发现了问题。ret[n:] -= ret[:-n]
与 ret[n:] = ret[n:] - ret[:-n]
不同。我已经修复了这个答案中的代码。编辑:没有其他人刚刚击败我。
@Timmmm 我做到了,这确实是问题所在。这个答案背后的一般原则被广泛用于图像处理(他们称之为总面积表),所以问题必须在实现中。过早优化的一个很好的例子,因为我有点记得在原地执行操作“因为它会更有效”。从好的方面来说,它可能确实更快地产生了错误的答案......
嗯,看来这个“易于实现”的功能实际上很容易出错,并引发了关于内存效率的良好讨论。如果这意味着知道某事做对了,我很高兴臃肿。【参考方案2】:
NumPy 缺乏特定领域的特定功能可能是由于核心团队的纪律和忠实于 NumPy 的主要指令:提供 N 维数组类型,以及用于创建的函数,以及索引这些数组。与许多基本目标一样,这个目标并不小,而 NumPy 做得非常出色。
(大得多)SciPy 包含大得多的特定领域库(称为子包由 SciPy 开发者提供)——例如,数值优化 (optimize)、信号处理 (signal) 和积分 (integrate)。
我的猜测是,您所追求的功能至少在一个 SciPy 子包中(scipy.signal 可能);但是,我会首先查看 SciPy scikits 的集合,确定相关的 scikit 并在那里寻找感兴趣的功能。
Scikits 是基于 NumPy/SciPy 独立开发的包,针对特定的技术学科(例如,scikits-image、scikits-learn 等)其中一些(尤其是用于数值优化的出色的 OpenOpt)早在选择驻留在相对较新的 scikits 标题下之前就已受到高度重视,成熟的项目。上面喜欢的 Scikits 主页列出了大约 30 个这样的 scikits,尽管其中至少有几个不再处于积极开发中。
遵循此建议将引导您进入scikits-timeseries;但是,该软件包不再处于积极开发中;实际上,Pandas 已成为,AFAIK,de facto NumPy-based 时间序列库。
Pandas 有几个函数可以用来计算移动平均线;其中最简单的可能是 rolling_mean,您可以这样使用:
>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP
>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')
>>> # the data:
>>> x = NP.arange(0, t.shape[0])
>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)
现在,只需调用函数 rolling_mean 传入 Series 对象和 窗口大小,在下面的示例中是 10 天。
>>> d_mva = PD.rolling_mean(D, 10)
>>> # d_mva is the same size as the original Series
>>> d_mva.shape
(1096,)
>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
验证它是否有效——例如,将原始系列中的值 10 - 15 与使用滚动平均值平滑的新系列进行比较
>>> D[10:15]
2010-01-11 2.041076
2010-01-12 2.041076
2010-01-13 2.720585
2010-01-14 2.720585
2010-01-15 3.656987
Freq: D
>>> d_mva[10:20]
2010-01-11 3.131125
2010-01-12 3.035232
2010-01-13 2.923144
2010-01-14 2.811055
2010-01-15 2.785824
Freq: D
函数 rolling_mean 以及大约十几个其他函数被非正式地分组在 Pandas 文档中的标题 moving window 函数下; Pandas 中的第二组相关函数称为指数加权函数(例如,ewma,它计算指数移动加权平均值)。第二组不包含在第一个(移动窗口函数)中的事实可能是因为指数加权变换不依赖于固定长度的窗口
【讨论】:
Pandas 确实拥有强大的移动窗口函数阵容。但在我看来,对于简单的移动平均线来说开销太大了。 我怀疑计算移动平均线是 OP 或其他任何人的孤立要求。如果您需要计算移动平均线,那么您几乎肯定有一个时间序列,这意味着您需要一个数据结构,允许您将日期时间索引与您的数据一致,这就是您所指的“开销”。 首先,感谢您抽出宝贵的时间来写这个内容丰富的答案。事实上,我看不到不涉及时间序列的移动平均线的用途。但这并不意味着需要使其符合日期时间,甚至符合特定的采样间隔(可能未知) 只是想补充一点,如果 pandas 作为依赖项似乎过于重量级,移动平均函数已被提取到 Bottleneck 库中。 'rolling_mean' 不再是 pandas 的一部分,请参阅使用 'rolling' 的回复【参考方案3】:如果您想仔细处理边缘条件(仅从边缘处的可用元素计算平均值),以下函数可以解决问题。
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
if N%2 == 0:
a, b = i - (N-1)//2, i + (N-1)//2 + 2
else:
a, b = i - (N-1)//2, i + (N-1)//2 + 1
#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])
>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])
【讨论】:
【参考方案4】:这个使用 Pandas 的答案改编自上面,因为 rolling_mean
不再是 Pandas 的一部分
# the recommended syntax to import pandas
import pandas as pd
import numpy as np
# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')
# the data:
x = np.arange(0, t.shape[0])
# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)
现在,只需在具有窗口大小的数据帧上调用函数rolling
,在下面的示例中为 10 天。
d_mva10 = D.rolling(10).mean()
# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
2010-01-04 NaN
2010-01-05 NaN
2010-01-06 NaN
2010-01-07 NaN
2010-01-08 NaN
2010-01-09 NaN
2010-01-10 4.5
2010-01-11 5.5
Freq: D, dtype: float64
【讨论】:
【参考方案5】:我觉得使用bottleneck可以轻松解决这个问题
请参阅下面的基本示例:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)
这给出了沿每个轴的移动平均值。
“mm”是“a”的移动平均值。
“window”是移动均值考虑的最大条目数。
“min_count”是移动均值考虑的最小条目数(例如,对于第一个元素或数组是否具有 nan 值)。
Bottleneck 的优点是有助于处理 nan 值,而且效率也很高。
【讨论】:
Bottleneck 是一个很好的快速且易于实施的解决方案(赞成),但需要强调的是,它只提供“滞后”的运行方式,而不是居中,正如 argentum2f 所指出的那样【参考方案6】:实现此目的的一种简单方法是使用np.convolve
。
这背后的想法是利用discrete convolution 的计算方式并使用它来返回滚动平均值。这可以通过与长度等于我们想要的滑动窗口长度的np.ones
序列进行卷积来完成。
为此,我们可以定义以下函数:
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
此函数将采用序列x
和长度为w
的序列的卷积。请注意,选择的mode
是valid
,因此卷积乘积仅适用于序列完全重叠的点。
一些例子:
x = np.array([5,3,8,10,2,1,5,1,0,2])
对于长度为2
的窗口的移动平均线,我们将:
moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])
对于一个长度为4
的窗口:
moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2. ])
convolve
是如何工作的?
让我们更深入地了解计算离散卷积的方式。
以下函数旨在复制np.convolve
计算输出值的方式:
def mov_avg(x, w):
for m in range(len(x)-(w-1)):
yield sum(np.ones(w) * x[m:m+w]) / w
对于上面的相同示例,也会产生:
list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]
所以每一步要做的就是在数组和当前窗口之间取内积。在这种情况下,考虑到我们直接取序列的sum
,乘以np.ones(w)
是多余的。
下面是如何计算第一个输出的示例,以便更清楚一点。假设我们想要一个w=4
的窗口:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5
以下输出将被计算为:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75
以此类推,在所有重叠执行完毕后返回序列的移动平均值。
【讨论】:
这是个好主意!它比 @Jaime 对小 n 的回答要快,但对于较大的 n 会变慢。 有时让输出数组与输入数组大小相同很有用。为此,mode='valid'
可以替换为'same'
。只是在这种情况下,边缘点将趋向于零。
在函数数组'x'的某些元素可能为None或0的情况下,如何从该函数中获取返回值的对应'x'值?此函数返回的数组的大小可以小于提供给它的数组“x”。
是否应该澄清一下,这种方法会产生“居中移动平均线”,而不是经常为金融应用计算的“简单移动平均线”? en.wikipedia.org/wiki/Moving_average【参考方案7】:
我实际上想要一个与接受的答案略有不同的行为。我正在为sklearn
管道构建移动平均特征提取器,因此我要求移动平均的输出与输入具有相同的维度。我想要的是移动平均线假设系列保持不变,即窗口 2 的移动平均线 [1,2,3,4,5]
将给出 [1.5,2.5,3.5,4.5,5.0]
。
对于列向量(我的用例),我们得到
def moving_average_col(X, n):
z2 = np.cumsum(np.pad(X, ((n,0),(0,0)), 'constant', constant_values=0), axis=0)
z1 = np.cumsum(np.pad(X, ((0,n),(0,0)), 'constant', constant_values=X[-1]), axis=0)
return (z1-z2)[(n-1):-1]/n
对于数组
def moving_average_array(X, n):
z2 = np.cumsum(np.pad(X, (n,0), 'constant', constant_values=0))
z1 = np.cumsum(np.pad(X, (0,n), 'constant', constant_values=X[-1]))
return (z1-z2)[(n-1):-1]/n
当然,不必为填充假定恒定值,但在大多数情况下这样做就足够了。
【讨论】:
【参考方案8】:这里有多种方法可以做到这一点,以及一些基准。最好的方法是使用来自其他库的优化代码的版本。 bottleneck.move_mean
方法可能是最好的。 scipy.convolve
方法也非常快速、可扩展,并且在语法和概念上都很简单,但是对于非常大的窗口值不能很好地扩展。如果您需要纯 numpy
方法,numpy.cumsum
方法很好。
注意:其中一些(例如bottleneck.move_mean
)没有居中,会改变您的数据。
import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time
def rollavg_direct(a,n):
'Direct "for" loop'
assert n%2==1
b = a*0.0
for i in range(len(a)) :
b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
return b
def rollavg_comprehension(a,n):
'List comprehension'
assert n%2==1
r,N = int(n/2),len(a)
return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)])
def rollavg_convolve(a,n):
'scipy.convolve'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]
def rollavg_convolve_edges(a,n):
'scipy.convolve, edge handling'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')
def rollavg_cumsum(a,n):
'numpy.cumsum'
assert n%2==1
cumsum_vec = np.cumsum(np.insert(a, 0, 0))
return (cumsum_vec[n:] - cumsum_vec[:-n]) / n
def rollavg_cumsum_edges(a,n):
'numpy.cumsum, edge handling'
assert n%2==1
N = len(a)
cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0))
d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))
return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d
def rollavg_roll(a,n):
'Numpy array rolling'
assert n%2==1
N = len(a)
rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
return a[rolling_idx].mean(axis=0)[n-1:]
def rollavg_roll_edges(a,n):
# see https://***.com/questions/42101082/fast-numpy-roll
'Numpy array rolling, edge handling'
assert n%2==1
a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
m = a.shape[1]
idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
out = a[np.arange(-n//2,n//2)[:,None], idx]
d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
return (out.sum(axis=0)/d)[n//2:]
def rollavg_pandas(a,n):
'Pandas rolling average'
return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()
def rollavg_bottlneck(a,n):
'bottleneck.move_mean'
return bn.move_mean(a, window=n, min_count=1)
N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve,
rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges,
rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]
print('Small window (n=3)')
%load_ext memory_profiler
for f in functions :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[0:-2] :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,1001)
print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[2:-2] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,1001)
定时,小窗口 (n=3)
Direct "for" loop :
4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
scipy.convolve, edge handling :
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum :
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Numpy array rolling :
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Numpy array rolling, edge handling :
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
时序,大窗口 (n=1001)
Direct "for" loop :
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
scipy.convolve, edge handling :
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
numpy.cumsum :
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
内存,小窗口 (n=3)
The memory_profiler extension is already loaded. To reload it, use:
%reload_ext memory_profiler
scipy.convolve :
peak memory: 362.66 MiB, increment: 73.61 MiB
scipy.convolve, edge handling :
peak memory: 510.24 MiB, increment: 221.19 MiB
numpy.cumsum :
peak memory: 441.81 MiB, increment: 152.76 MiB
numpy.cumsum, edge handling :
peak memory: 518.14 MiB, increment: 228.84 MiB
Pandas rolling average :
peak memory: 449.34 MiB, increment: 160.02 MiB
bottleneck.move_mean :
peak memory: 374.17 MiB, increment: 75.54 MiB
Numpy array rolling :
peak memory: 661.29 MiB, increment: 362.65 MiB
Numpy array rolling, edge handling :
peak memory: 1111.25 MiB, increment: 812.61 MiB
内存,大窗口 (n=1001)
scipy.convolve :
peak memory: 370.62 MiB, increment: 71.83 MiB
scipy.convolve, edge handling :
peak memory: 521.98 MiB, increment: 223.18 MiB
numpy.cumsum :
peak memory: 451.32 MiB, increment: 152.52 MiB
numpy.cumsum, edge handling :
peak memory: 527.51 MiB, increment: 228.71 MiB
Pandas rolling average :
peak memory: 451.25 MiB, increment: 152.50 MiB
bottleneck.move_mean :
peak memory: 374.64 MiB, increment: 75.85 MiB
【讨论】:
【参考方案9】:talib 包含一个简单的移动平均线工具,以及其他类似的平均线工具(即指数移动平均线)。下面将该方法与其他一些解决方案进行比较。
%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
需要注意的是,实数必须具有dtype = float
的元素。否则会引发以下错误
例外:real 不是 double
【讨论】:
【参考方案10】:这是一个使用 numba 的快速实现(注意类型)。请注意,它确实包含移位的 nan。
import numpy as np
import numba as nb
@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
fastmath=True,nopython=True)
def moving_average( array, window ):
ret = np.cumsum(array)
ret[window:] = ret[window:] - ret[:-window]
ma = ret[window - 1:] / window
n = np.empty(window-1); n.fill(np.nan)
return np.concatenate((n.ravel(), ma.ravel()))
【讨论】:
这会在开头返回 nans。【参考方案11】:for i in range(len(Data)):
Data[i, 1] = Data[i-lookback:i, 0].sum() / lookback
试试这段代码。我认为它更简单并且可以完成工作。 回顾是移动平均线的窗口。
在Data[i-lookback:i, 0].sum()
中,我已将0
用于引用数据集的第一列,但您可以放置任何您喜欢的列,以防您有多个列。
【讨论】:
【参考方案12】:移动平均线
迭代器方法
在 i 处反转数组,并简单地从 i 到 n 取平均值。
使用列表推导即时生成迷你数组。
x = np.random.randint(10, size=20)
def moving_average(arr, n):
return [ (arr[:i+1][::-1][:n]).mean() for i, ele in enumerate(arr) ]
d = 5
moving_average(x, d)
张量卷积
moving_average = np.convolve(x, np.ones(d)/d, mode='valid')
【讨论】:
感谢您的两个解决方案。我使用示例数据x
运行两者,但它给出了不同的结果。你能解释一下原因吗?【参考方案13】:
我使用accepted answer 的解决方案,稍作修改以具有与输入相同的输出长度,或者使用另一个答案的评论中提到的pandas
' 版本。我在这里总结了一个可重复的示例以供将来参考:
import numpy as np
import pandas as pd
def moving_average(a, n):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret / n
def moving_average_centered(a, n):
return pd.Series(a).rolling(window=n, center=True).mean().to_numpy()
A = [0, 0, 1, 2, 4, 5, 4]
print(moving_average(A, 3))
# [0. 0. 0.33333333 1. 2.33333333 3.66666667 4.33333333]
print(moving_average_centered(A, 3))
# [nan 0.33333333 1. 2.33333333 3.66666667 4.33333333 nan ]
【讨论】:
【参考方案14】:通过将下面的解决方案与使用 numpy 的 cumsum 的解决方案进行比较,这个解决方案几乎花费了 一半的时间。这是因为它不需要遍历整个数组来做 cumsum 然后做所有的减法。此外,如果数组很大并且数量很大(可能溢出),则 cumsum 可能是“dangerous”。当然,这里也存在危险,但至少只是将基本数字加在一起。
def moving_average(array_numbers, n):
if n > len(array_numbers):
return []
temp_sum = sum(array_numbers[:n])
averages = [temp_sum / float(n)]
for first_index, item in enumerate(array_numbers[n:]):
temp_sum += item - array_numbers[first_index]
averages.append(temp_sum / float(n))
return averages
【讨论】:
【参考方案15】:从Numpy 1.20
开始,sliding_window_view
提供了一种在元素窗口中滑动/滚动的方式。然后您可以单独平均的窗口。
例如,对于4
-element 窗口:
from numpy.lib.stride_tricks import sliding_window_view
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])
注意sliding_window_view
的中间结果:
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5, 3, 8, 10],
# [ 3, 8, 10, 2],
# [ 8, 10, 2, 1],
# [10, 2, 1, 5],
# [ 2, 1, 5, 1],
# [ 1, 5, 1, 0],
# [ 5, 1, 0, 2]])
【讨论】:
【参考方案16】:所有答案似乎都集中在预计算列表的情况上。对于实际运行的用例,数字一个一个出现,这里有一个简单的类,它提供对最后 N 个值进行平均的服务:
import numpy as np
class RunningAverage():
def __init__(self, stack_size):
self.stack = [0 for _ in range(stack_size)]
self.ptr = 0
self.full_cycle = False
def add(self,value):
self.stack[self.ptr] = value
self.ptr += 1
if self.ptr == len(self.stack):
self.full_cycle = True
self.ptr = 0
def get_avg(self):
if self.full_cycle:
return np.mean(self.stack)
else:
return np.mean(self.stack[:self.ptr])
用法:
N = 50 # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
value = <my computation>
run_avg.add(value)
if i % 20 ==0: # print once in 20 iters:
print(f'the average value is run_avg.get_avg()')
【讨论】:
“运行”(实际上是问题中的滚动/移动)不是指流数据。它指的是沿着数据推送的移动窗口。这就是为什么答案集中于此。【参考方案17】:移动平均很简单:
def avg(x,n=1):
return avg((x[1:]+x[:-1])/2 ,n-1) if n>1 else (x[1:]+x[:-1])/2
numpy 平均是:
def avg(x,n=1):
return np.convolve(x, np.ones(n):Valid)/n
点数最多300个数据点 ryzen 5 5900 单核更快,上面那个numpy更快...
最多 5000 个数据点使用 multiprocessing 库 24 线程(进程)ryzen 5 5900 更快。再次超过 numpy 更快。
CuPy 使用 GPU 是另一回事:
import cupy as cp
import numpy as np
def avgnp(x,n=1):
return np.convolve(x,np.ones(n),mode='valid')/n
def avgcp(x,n=1):
return cp.convolve(x,cp.ones(n)/n,mode='full')
print('Moving Average for, ')
for i in range(3,8):
s=time.time();
t_cpu=np.linspace(0,2*np.pi,10**i)
x_cpu=np.sin(t_cpu)
avgnp(x_cpu,5)
print(10**i,'data points via Numpy(CPU):\t',time.time()-s,'*')
time.sleep(1)
s=time.time()
t=cp.linspace(0,2*np.pi,10**i)
x=cp.sin(t)
avgcp(x,5)
print(10**i,'data points via CuPy(GPU) :\t',time.time()-s)
time.sleep(1)
结果是:
Moving Average for,
1000 data points via Numpy(CPU): 0.0005373954772949219 *
1000 data points via CuPy(GPU) : 0.00930333137512207
10000 data points via Numpy(CPU): 0.00034356117248535156 *
10000 data points via CuPy(GPU) : 0.0007426738739013672
100000 data points via Numpy(CPU): 0.0014045238494873047 *
100000 data points via CuPy(GPU) : 0.0006258487701416016
1000000 data points via Numpy(CPU): 0.014684438705444336 *
1000000 data points via CuPy(GPU) : 0.0006639957427978516
10000000 data points via Numpy(CPU): 0.1448352336883545 *
10000000 data points via CuPy(GPU) : 0.0006761550903320312
【讨论】:
【参考方案18】:如果有人需要一个简单的解决方案,这里有一个
def moving_average(a,n):
N=len(a)
return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])
您可以通过在np.arange(0,N-n+1,step)
中添加 step 参数来更改窗口之间的重叠
【讨论】:
好答案,我只建议返回 NumPy 数组而不是 Python 列表。 瞧,正如你所说。以上是关于如何使用 python + NumPy / SciPy 计算滚动/移动平均值?的主要内容,如果未能解决你的问题,请参考以下文章
将 Python 代码转换为 Android 的 apk? [关闭]