NumPy 版本的“指数加权移动平均线”,相当于 pandas.ewm().mean()
Posted
技术标签:
【中文标题】NumPy 版本的“指数加权移动平均线”,相当于 pandas.ewm().mean()【英文标题】:NumPy version of "Exponential weighted moving average", equivalent to pandas.ewm().mean() 【发布时间】:2017-08-09 17:13:52 【问题描述】:如何在 NumPy 中获得指数加权移动平均线,就像 pandas 中的以下内容一样?
import pandas as pd
import pandas_datareader as pdr
from datetime import datetime
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get PANDAS exponential weighted moving average
ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix()
print(ewm_pd)
我用 NumPy 尝试了以下操作
import numpy as np
import pandas_datareader as pdr
from datetime import datetime
# From this post: http://***.com/a/40085052/3293881 by @Divakar
def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S
nrows = ((a.size - L) // S) + 1
n = a.strides[0]
return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n))
def numpyEWMA(price, windowSize):
weights = np.exp(np.linspace(-1., 0., windowSize))
weights /= weights.sum()
a2D = strided_app(price, windowSize, 1)
returnArray = np.empty((price.shape[0]))
returnArray.fill(np.nan)
for index in (range(a2D.shape[0])):
returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1]
return np.reshape(returnArray, (-1, 1))
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get NumPy exponential weighted moving average
ewma_np = numpyEWMA(ibm, windowSize)
print(ewma_np)
但是结果和pandas中的不一样。
是否有更好的方法可以直接在 NumPy 中计算指数加权移动平均值并获得与 pandas.ewm().mean()
完全相同的结果?
在 pandas 解决方案的 60,000 个请求中,我得到大约 230 秒。我确信使用纯 NumPy 可以显着减少。
【问题讨论】:
为什么不直接使用您已经拥有的可工作的 Pandas 代码? 因为 Pandas 的性能较慢,我寻找矢量化 numpy 的替代品 更改标题以强调性能问题。对于我们这些只是为了方便起见而对查找 numpy.ewm 感兴趣的人,这个问题出现在搜索结果的顶部,并不是我们想要的。 【参考方案1】:推导:
实施:
def ema(p:np.ndarray, a:float) -> np.ndarray:
o = np.empty(p.shape, p.dtype)
# (1-α)^0, (1-α)^1, (1-α)^2, ..., (1-α)^n
np.power(1.0 - a, np.arange(0.0, p.shape[0], 1.0, p.dtype), o)
# α*P0, α*P1, α*P2, ..., α*Pn
np.multiply(a, p, p)
# α*P0/(1-α)^0, α*P1/(1-α)^1, α*P2/(1-α)^2, ..., α*Pn/(1-α)^n
np.divide(p, o, p)
# α*P0/(1-α)^0, α*P0/(1-α)^0 + α*P1/(1-α)^1, ...
np.cumsum(p, out=p)
# (α*P0/(1-α)^0)*(1-α)^0, (α*P0/(1-α)^0 + α*P1/(1-α)^1)*(1-α)^1, ...
np.multiply(p, o, o)
return o
注意:输入将被覆盖。
【讨论】:
【参考方案2】:指数滤波器与一阶 IIR 滤波器不一样吗? 你为什么不试试这个:
from scipy import signal
signal.lfilter([alpha], [1, alpha-1], data)
其中 alpha 的范围是 0 到 1
【讨论】:
【参考方案3】:下面的所有答案都不考虑缺失值,所以我给出我的版本,它假设 nan 并且结果匹配 pandas EWM。我用 numba 来加速,比 pandas 的实现快十倍。
matrix = df.values
@jit(nopython=True, nogil=True, parallel=True)
def ewm_mean(arr_in, com):
'''
calculate the exponential moving average for each column
$y_t=\fracewm_tw_t$
$ewm_t = ewm_t-1 (1 - \alpha) + x_t$
$w_t = w_t-1 (1 - \alpha) + 1$
arr_in->ndarray(dtype=float64): ewm per column
com->int: $\alpha = 1/com + 1$
'''
t, m = arr_in.shape
ewma = np.empty((t, m), dtype=float32)
alpha = 1 / (com + 1)
# the size of blocks depending on the device, number of blocks should match the number of cores on your machine
sizeOfblock = 1000
numberOfblock = m // sizeOfblock
assert sizeOfblock*numberOfblock == m, "wrong split"
# main loop
for nb in prange(numberOfblock): # split columns to blocks
w = np.where(np.isnan(arr_in[0, nb*sizeOfblock: (nb+1)*sizeOfblock]), 0., 1.)
ewma_old = arr_in[0, nb*sizeOfblock: (nb+1)*sizeOfblock]
ewma[0, nb*sizeOfblock: (nb+1)*sizeOfblock] = ewma_old
for i in range(1, t): # accumulate row by row
data_now = arr_in[i, nb*sizeOfblock: (nb+1)*sizeOfblock]
ewma_old = np.where(np.isnan(ewma_old), 0., ewma_old)*(1-alpha) + data_now
if np.isnan(np.sum(data_now)): # check nan
nan_pos = np.isnan(data_now)
w = w * (1 - alpha) + np.where(nan_pos, 0., 1.)
d = ewma_old / w
ewma[i, nb*sizeOfblock: (nb+1)*sizeOfblock] = np.where(nan_pos, ewma[i-1, nb*sizeOfblock: (nb+1)*sizeOfblock], d)
else:
w = w * (1 - alpha) + 1.
d = ewma_old / w
ewma[i, nb*sizeOfblock: (nb+1)*sizeOfblock] = d
return ewma
np.isclose(df.ewm(com=2).mean().values, ewm_mean(matrix, com=2), equal_nan=True).all()
数据集如下所示。
import pandas as pd
import numpy as np
np.random.seed(0)
df = pd.DataFrame(np.random.normal(0, 10, size=(4000, 5000)))
df.iloc[0:800, 1000:2000] = np.nan
df.iloc[800:1600, 2000:3000] = np.nan
df.iloc[1600:2400, 3000:4000] = np.nan
df.iloc[2400:3200, 4000:5000] = np.nan
注意:这段代码花费大量时间处理 nan 数字,因此我将数据拆分为块并并行计算。如果您的数据没有 nan 值,您可以进行微小的更改并显着加快速度。此外,您可以编写更复杂的逻辑来将数据分配给内核。 sizeOfblock
是一个参数;玩它。
【讨论】:
【参考方案4】:避免 numba 并且对于大型数组在 Alexander McFarlane's solution 的因子 2 内的一个非常简单的解决方案是使用 scipy 的 lfilter
函数(因为 EWMA 是线性过滤器):
from scipy.signal import lfiltic, lfilter
# careful not to mix between scipy.signal and standard python signal
# (https://docs.python.org/3/library/signal.html) if your code handles some processes
def ewma_linear_filter(array, window):
alpha = 2 /(window + 1)
b = [alpha]
a = [1, alpha-1]
zi = lfiltic(b, a, array[0:1], [0])
return lfilter(b, a, array, zi=zi)[0]
时间安排如下:
window = 100 # doesn't have any impact on run time
for n in [1000, 10_000, 100_000, 1_000_000, 10_000_000, 100_000_000]:
data = np.random.normal(0, 1, n)
print(f'n=n:,d, window=window')
%timeit _ewma_infinite_hist(data, window)
%timeit ewma_linear_filter(data, window)
print()
n=1,000, window=100
5.01 µs ± 23.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
58.4 µs ± 1.05 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
n=10,000, window=100
39 µs ± 101 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
134 µs ± 387 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
n=100,000, window=100
373 µs ± 2.56 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
845 µs ± 2.27 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
n=1,000,000, window=100
5.35 ms ± 22 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
9.77 ms ± 78.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
n=10000000, window=100
53.7 ms ± 200 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
96.6 ms ± 2.28 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
n=10,0000,000, window=100
547 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
963 ms ± 4.52 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
【讨论】:
v 很好——你能添加一个例子来展示两者之间的关系吗?它可能有助于扫描读者明确地看到你匹配无限历史而不是pandas.ewm(span).mean()
每个OP
我似乎无法重现我的帖子的原始时间(作为参考,_ewma_infinite_hist
为 86 毫秒,linear_filter
为 92.6 毫秒)。我已经用一系列参数的更新时间更新了帖子。这是在锁定到 2.16 GHz 的 i7-10875H 移动 CPU 上运行的。【参考方案5】:
2019 年 8 月 6 日更新
适用于大输入的纯 NUMPY、快速和矢量化解决方案
out
用于就地计算的参数,
dtype
参数,
索引order
参数
这个函数相当于pandas的@987654326@,但是要快得多。 ewm(adjust=True).mean()
(pandas 的默认值)可以在结果的开头产生不同的值。我正在努力将adjust
功能添加到此解决方案中。
@Divakar's answer 在输入过大时会导致浮点精度问题。这是因为(1-alpha)**(n+1) -> 0
当n -> inf
和alpha -> 1
时,导致计算中弹出除零和NaN
值。
这是我最快的解决方案,没有精度问题,几乎完全矢量化。它变得有点复杂,但性能很棒,尤其是对于非常大的输入。不使用就地计算(可以使用out
参数,节省内存分配时间):100M 元素输入向量为 3.62 秒,100K 元素输入向量为 3.2ms,5000 元素输入向量为 293µs相当旧的 PC(结果会因不同的 alpha
/row_size
值而异)。
# tested with python3 & numpy 1.15.2
import numpy as np
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: 'C', 'F', 'A', optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float
else:
dtype = np.dtype(dtype)
row_size = int(row_size) if row_size is not None
else get_max_row_size(alpha, dtype)
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]
if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
order='C', out=out_main_view)
scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
last_scaling_factor = scaling_factors[-1]
# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
dtype=dtype, order='C', out=out[-trailing_n:])
return out
def get_max_row_size(alpha, dtype=float):
assert 0. <= alpha < 1.
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon)/np.log(1-alpha)) + 1
一维 ewma 函数:
def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: 'C', 'F', 'A', optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
offset = data[0]
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),
dtype=dtype)
# create cumulative sum array
np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
dtype=dtype, out=out)
np.cumsum(out, dtype=dtype, out=out)
# cumsums / scaling
out /= scaling_factors[-2::-1]
if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]
return out
2D ewma 函数:
def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: 'C', 'F', 'A', optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)
if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = out_view.T
if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
dtype=dtype)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype, out=out_view
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]
if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
return out
用法:
data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100
span = 5000 # span >= 1
alpha = 2/(span+1) # for pandas` span parameter
# com = 1000 # com >= 0
# alpha = 1/(1+com) # for pandas` center-of-mass parameter
# halflife = 100 # halflife > 0
# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter
result = ewma_vectorized_safe(data, alpha)
只是提示
很容易计算给定alpha
的“窗口大小”(从技术上讲,指数平均值具有无限的“窗口”),具体取决于该窗口中的数据对平均值的贡献。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的部分很有用。
def window_size(alpha, sum_proportion):
# Increases with increased sum_proportion and decreased alpha
# solve (1-alpha)**window_size = (1-sum_proportion) for window_size
return int(np.log(1-sum_proportion) / np.log(1-alpha))
alpha = 0.02
sum_proportion = .99 # window covers 99% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 227
sum_proportion = .75 # window covers 75% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 68
此线程中使用的alpha = 2 / (window_size + 1.0)
关系(pandas 中的“跨度”选项)是上述函数(使用sum_proportion~=0.87
)的逆函数的非常粗略的近似。 alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size)
更准确(pandas 的“半衰期”选项等于该公式与 sum_proportion=0.5
)。
在以下示例中,data
表示连续噪声信号。 cutoff_idx
是 result
中的第一个位置,其中至少 99% 的值取决于 data
中的单独值(即少于 1% 取决于 data[0])。直到cutoff_idx
的数据被排除在最终结果之外,因为它过于依赖data
中的第一个值,因此可能会扭曲平均值。
result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]
为了说明上面解决的问题,你可以运行几次,注意红线经常出现的错误开始,在cutoff_idx
之后被跳过:
data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)
result = ewma_vectorized_safe(data, alpha)
cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)
import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',
x[cutoff_idx:], result[cutoff_idx:], '-b')
plt.show()
请注意 cutoff_idx==window
,因为 alpha 是使用 window_size()
函数的逆函数设置的,具有相同的 sum_proportion
。
这类似于 pandas 应用 ewm(span=window, min_periods=window)
的方式。
【讨论】:
老实说,我不明白为什么对于大跨度(例如 5000),与 pandasewm(adjust=False).mean()
相比,此函数给出不同的结果
太棒了!感谢分享!你能写一个2D ewma函数用法的例子吗,真的需要帮助^_^
@yogazining:你只需要给它你的 2D 矩阵、alpha 参数和你想要平均的轴。例如像这样:result = ewma_vectorized_2d(input, alpha, axis=1)
。结果将是一个二维矩阵,其中的值是输入的轴 1 上的 ewma 平均值。所有参数在代码 cmets 中有更详细的描述。但要小心,因为对于非常大的输入(当您要使用的轴大于 get_max_row_size(alpha)
时)您会遇到精度问题,因此请务必检查。有解决方法,如果需要请告诉我
关于@yogazining 的问题:ewma_vectorized_safe
函数用于一维输入。可以为 2D 输入创建 ewma_vectorized_2d_safe
函数,该函数将执行所有精度检查和整形以使用 2D 输入而无需担心精度。如果有人需要这样的功能,请告诉我,编写和测试它需要一些时间,但如果有足够的需求,我会抽出时间
@ArtemAlexandrov 你有可重现的例子吗?【参考方案6】:
这是我对具有无限窗口大小的一维输入数组的实现。由于它使用大数,因此它仅适用于元素绝对值
想法是将输入数组reshape成有限长度的slice,这样就不会发生溢出,然后分别对每个slice进行ewm计算。
def ewm(x, alpha):
"""
Returns the exponentially weighted mean y of a numpy array x with scaling factor alpha
y[0] = x[0]
y[j] = (1. - alpha) * y[j-1] + alpha * x[j], for j > 0
x -- 1D numpy array
alpha -- float
"""
n = int(-100. / np.log(1.-alpha)) # Makes sure that the first and last elements in f are very big and very small (about 1e22 and 1e-22)
f = np.exp(np.arange(1-n, n, 2) * (0.5 * np.log(1. - alpha))) # Scaling factor for each slice
tmp = (np.resize(x, ((len(x) + n - 1) // n, n)) / f * alpha).cumsum(axis=1) * f # Get ewm for each slice of length n
# Add the last value of each previous slice to the next slice with corresponding scaling factor f and return result
return np.resize(tmp + np.tensordot(np.append(x[0], np.roll(tmp.T[n-1], 1)[1:]), f * ((1. - alpha) / f[0]), axes=0), len(x))
【讨论】:
【参考方案7】:最快的 EWMA 23x pandas
问题是严格要求numpy
解决方案,但是,似乎OP 实际上只是在纯numpy
解决方案之后加快运行时间。
我解决了一个类似的问题,但转而关注numba.jit
,它极大地加快了计算时间
In [24]: a = np.random.random(10**7)
...: df = pd.Series(a)
In [25]: %timeit numpy_ewma(a, 10) # /a/42915307/4013571
...: %timeit df.ewm(span=10).mean() # pandas
...: %timeit numpy_ewma_vectorized_v2(a, 10) # best w/o numba: /a/42926270/4013571
...: %timeit _ewma(a, 10) # fastest accurate (below)
...: %timeit _ewma_infinite_hist(a, 10) # fastest overall (below)
4.14 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
991 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
396 ms ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
181 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
39.6 ms ± 979 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
缩小为a = np.random.random(100)
的较小数组(结果顺序相同)
41.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
945 ms ± 12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
16 µs ± 93.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.66 µs ± 13.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1.14 µs ± 5.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
还值得指出的是,我下面的函数与pandas
完全相同(参见 docstr 中的示例),而这里的一些答案采用了各种不同的近似值。例如,
In [57]: print(pd.DataFrame([1,2,3]).ewm(span=2).mean().values.ravel())
...: print(numpy_ewma_vectorized_v2(np.array([1,2,3]), 2))
...: print(numpy_ewma(np.array([1,2,3]), 2))
[1. 1.75 2.61538462]
[1. 1.66666667 2.55555556]
[1. 1.18181818 1.51239669]
我为自己的库记录的源代码
import numpy as np
from numba import jit
from numba import float64
from numba import int64
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
to provide better adjustments for small windows via:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
(1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=True).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
w = 1
ewma_old = arr_in[0]
ewma[0] = ewma_old
for i in range(1, n):
w += (1-alpha)**i
ewma_old = ewma_old*(1-alpha) + arr_in[i]
ewma[i] = ewma_old / w
return ewma
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma_infinite_hist(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
assuming infinite history via the recursive form:
(2) (i) y[0] = x[0]; and
(ii) y[t] = a*x[t] + (1-a)*y[t-1] for t>0.
This method is less accurate that ``_ewma`` but
much faster:
In [1]: import numpy as np, bars
...: arr = np.random.random(100000)
...: %timeit bars._ewma(arr, 10)
...: %timeit bars._ewma_infinite_hist(arr, 10)
3.74 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
262 µs ± 1.54 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=False).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
ewma[0] = arr_in[0]
for i in range(1, n):
ewma[i] = arr_in[i] * alpha + ewma[i-1] * (1 - alpha)
return ewma
【讨论】:
如果我想处理 float32 对象,我看到来自 pandas 和基于 numba 的 ewm(您的代码)的.ewm().mean()
给出了不同的结果。有没有可能解决这个问题?
我认为是的。对于最小的工作示例,我执行以下操作:1)使用 dtype='float32'
创建 numpy 数组,2)使用 dtype=float
创建数组。然后比较 pandas .ewm().mean()
和基于 numba 的 ewm 的结果。对于float64
,相对差异为零,而对于float32
,它大约为~ 10^5
。
但我不明白:在我的研究中,我处理类型为 float32
的 numpy 数组,我天真地假设使用 float32
的基于 numba 的 ewm 就足够了。然而,这种期望被打破了。
我刚刚运行了上面In [57]
的最小示例。结果与 float32 完全匹配。我认为你的代码有错误。
好的,让我再试一次,因为这对我来说真的很重要,我希望这也足够有趣,让你花一些时间在这上面。我有 2 个基于 numba 的 ewm 函数: float64
和 float32
。我创建了两个数组:np.arange(1, 10**7)*10**(-12)
类型为 np.float32
和 np.float64
。然后,我计算这两个数组的相对差异 diff = (pd_ewm - numba_ewm)/(pd_ewm)
。在float64
的情况下,我看到差异为零,而在float32
的情况下,我看到差异大约是~10**5
。这对我来说似乎很奇怪。【参考方案8】:
这个答案似乎无关紧要。但是,对于那些还需要使用 NumPy 计算指数加权方差(以及标准差)的人,以下解决方案将很有用:
import numpy as np
def ew(a, alpha, winSize):
_alpha = 1 - alpha
ws = _alpha ** np.arange(winSize)
w_sum = ws.sum()
ew_mean = np.convolve(a, ws)[winSize - 1] / w_sum
bias = (w_sum ** 2) / ((w_sum ** 2) - (ws ** 2).sum())
ew_var = (np.convolve((a - ew_mean) ** 2, ws)[winSize - 1] / w_sum) * bias
ew_std = np.sqrt(ew_var)
return (ew_mean, ew_var, ew_std)
【讨论】:
【参考方案9】:这是 O 在此期间提出的另一个解决方案。它比 pandas 解决方案快四倍左右。
def numpy_ewma(data, window):
returnArray = np.empty((data.shape[0]))
returnArray.fill(np.nan)
e = data[0]
alpha = 2 / float(window + 1)
for s in range(data.shape[0]):
e = ((data[s]-e) *alpha ) + e
returnArray[s] = e
return returnArray
我以this formula 为起点。我相信这可以进一步改进,但这至少是一个起点。
【讨论】:
【参考方案10】:这是一个使用 NumPy 的实现,等效于使用 df.ewm(alpha=alpha).mean()
。阅读文档后,只是一些矩阵运算。诀窍在于构建正确的矩阵。
值得注意的是,由于我们正在创建浮点矩阵,因此如果输入数组太大,您可以快速消耗内存。
import pandas as pd
import numpy as np
def ewma(x, alpha):
'''
Returns the exponentially weighted moving average of x.
Parameters:
-----------
x : array-like
alpha : float 0 <= alpha <= 1
Returns:
--------
ewma: numpy array
the exponentially weighted moving average
'''
# Coerce x to an array
x = np.array(x)
n = x.size
# Create an initial weight matrix of (1-alpha), and a matrix of powers
# to raise the weights by
w0 = np.ones(shape=(n,n)) * (1-alpha)
p = np.vstack([np.arange(i,i-n,-1) for i in range(n)])
# Create the weight matrix
w = np.tril(w0**p,0)
# Calculate the ewma
return np.dot(w, x[::np.newaxis]) / w.sum(axis=1)
让我们测试一下:
alpha = 0.55
x = np.random.randint(0,30,15)
df = pd.DataFrame(x, columns=['A'])
df.ewm(alpha=alpha).mean()
# returns:
# A
# 0 13.000000
# 1 22.655172
# 2 20.443268
# 3 12.159796
# 4 14.871955
# 5 15.497575
# 6 20.743511
# 7 20.884818
# 8 24.250715
# 9 18.610901
# 10 17.174686
# 11 16.528564
# 12 17.337879
# 13 7.801912
# 14 12.310889
ewma(x=x, alpha=alpha)
# returns:
# array([ 13. , 22.65517241, 20.44326778, 12.1597964 ,
# 14.87195534, 15.4975749 , 20.74351117, 20.88481763,
# 24.25071484, 18.61090129, 17.17468551, 16.52856393,
# 17.33787888, 7.80191235, 12.31088889])
【讨论】:
您可以在此处更改的另一件事是,您可以在函数开头写alpha=2/(winsowSize+1)
,而不是提供alpha,然后像在pandas函数中一样提供windowSize
有没有其他方法可以通过避免 np.vstack 循环和 w = np.tril(w0**p,0)
来计算 p = np.vstack([np.arange(i,i-n,-1) for i in range(n)])
?与 pandas 解决方案相比,这个 np.vstack looping 和 np.tril 函数似乎使整个函数的执行速度极慢
@RaduS 只需将函数粘贴在一个类中并在初始化时计算一次权重矩阵。这应该适用于您将查看多个相同大小的窗口的情况。【参考方案11】:
我想我终于破解了!
这是numpy_ewma
函数的矢量化版本,据称可以从@RaduS's post
产生正确的结果-
def numpy_ewma_vectorized(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
scale = 1/alpha_rev
n = data.shape[0]
r = np.arange(n)
scale_arr = scale**r
offset = data[0]*alpha_rev**(r+1)
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
进一步提升
我们可以通过一些代码重用来进一步提升它,就像这样 -
def numpy_ewma_vectorized_v2(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
n = data.shape[0]
pows = alpha_rev**(np.arange(n+1))
scale_arr = 1/pows[:-1]
offset = data[0]*pows[1:]
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
运行时测试
让我们针对大型数据集的同一个循环函数对这两个函数进行计时。
In [97]: data = np.random.randint(2,9,(5000))
...: window = 20
...:
In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))
Out[98]: True
In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))
Out[99]: True
In [100]: %timeit numpy_ewma(data, window)
100 loops, best of 3: 6.03 ms per loop
In [101]: %timeit numpy_ewma_vectorized(data, window)
1000 loops, best of 3: 665 µs per loop
In [102]: %timeit numpy_ewma_vectorized_v2(data, window)
1000 loops, best of 3: 357 µs per loop
In [103]: 6030/357.0
Out[103]: 16.89075630252101
大约有 17 倍的加速!
【讨论】:
如果输入向量太大,此代码不起作用。例如,ewma( np.ones(10000), span=10)
给出所有 nan
s。发生这种情况是因为在某些时候pows
变为全零,而scale_arr
变为全nan
s。
是的,可以确认@unsorted 所说的。在我的例子中,一个 120k 长的数据,每个元素的值在 22000 到 34000 之间,结果都是 nans【参考方案12】:
感谢@Divakar 的解决方案,这真的很快。但是,它确实会导致@Danny 指出的溢出问题。当长度大于 13835 左右时,该函数不会返回正确答案。
以下是我基于Divakar的解决方案和pandas.ewm().mean()的解决方案
def numpy_ema(data, com=None, span=None, halflife=None, alpha=None):
"""Summary
Calculate ema with automatically-generated alpha. Weight of past effect
decreases as the length of window increasing.
# these functions reproduce the pandas result when the flag adjust=False is set.
References:
https://***.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
Args:
data (TYPE): Description
com (float, optional): Specify decay in terms of center of mass, alpha=1/(1+com), for com>=0
span (float, optional): Specify decay in terms of span, alpha=2/(span+1), for span>=1
halflife (float, optional): Specify decay in terms of half-life, alpha=1-exp(log(0.5)/halflife), for halflife>0
alpha (float, optional): Specify smoothing factor alpha directly, 0<alpha<=1
Returns:
TYPE: Description
Raises:
ValueError: Description
"""
n_input = sum(map(bool, [com, span, halflife, alpha]))
if n_input != 1:
raise ValueError(
'com, span, halflife, and alpha are mutually exclusive')
nrow = data.shape[0]
if np.isnan(data).any() or (nrow > 13835) or (data.ndim == 2):
df = pd.DataFrame(data)
df_ewm = df.ewm(com=com, span=span, halflife=halflife,
alpha=alpha, adjust=False)
out = df_ewm.mean().values.squeeze()
else:
if com:
alpha = 1 / (1 + com)
elif span:
alpha = 2 / (span + 1.0)
elif halflife:
alpha = 1 - np.exp(np.log(0.5) / halflife)
alpha_rev = 1 - alpha
pows = alpha_rev**(np.arange(nrow + 1))
scale_arr = 1 / pows[:-1]
offset = data[0] * pows[1:]
pw0 = alpha * alpha_rev**(nrow - 1)
mult = data * pw0 * scale_arr
cumsums = np.cumsum(mult)
out = offset + cumsums * scale_arr[::-1]
return out
【讨论】:
【参考方案13】:在Divakar 的出色答案的基础上,这里有一个对应于pandas 函数的adjust=True
标志的实现,即使用权重而不是递归。
def numpy_ewma(data, window):
alpha = 2 /(window + 1.0)
scale = 1/(1-alpha)
n = data.shape[0]
scale_arr = (1-alpha)**(-1*np.arange(n))
weights = (1-alpha)**np.arange(n)
pw0 = (1-alpha)**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = cumsums*scale_arr[::-1] / weights.cumsum()
return out
【讨论】:
【参考方案14】:@Divakar 的回答似乎在处理时会造成溢出
numpy_ewma_vectorized(np.random.random(500000), 10)
我一直使用的是:
def EMA(input, time_period=10): # For time period = 10
t_ = time_period - 1
ema = np.zeros_like(input,dtype=float)
multiplier = 2.0 / (time_period + 1)
#multiplier = 1 - multiplier
for i in range(len(input)):
# Special Case
if i > t_:
ema[i] = (input[i] - ema[i-1]) * multiplier + ema[i-1]
else:
ema[i] = np.mean(input[:i+1])
return ema
但是,这比 panda 解决方案要慢:
from pandas import ewma as pd_ema
def EMA_fast(X, time_period = 10):
out = pd_ema(X, span=time_period, min_periods=time_period)
out[:time_period-1] = np.cumsum(X[:time_period-1]) / np.asarray(range(1,time_period))
return out
【讨论】:
【参考方案15】:给定alpha
和windowSize
,这是一种在 NumPy 上模拟相应行为的方法 -
def numpy_ewm_alpha(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.full(df.shape[0],np.nan)
out[windowSize-1:] = np.convolve(a,wghts,'valid')
return out
用于验证的样本运行 -
In [54]: alpha = 0.55
...: windowSize = 20
...:
In [55]: df = pd.DataFrame(np.random.randint(2,9,(100)))
In [56]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
...:
Max. error : 5.10531254605e-07
In [57]: alpha = 0.75
...: windowSize = 30
...:
In [58]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
Max. error : 8.881784197e-16
在更大的数据集上进行运行时测试 -
In [61]: alpha = 0.55
...: windowSize = 20
...:
In [62]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [63]: %timeit df.ewm(alpha = alpha, min_periods=windowSize).mean()
1000 loops, best of 3: 851 µs per loop
In [64]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
进一步提升
为了进一步提升性能,我们可以避免使用 NaN 进行初始化,而是使用从 np.convolve
输出的数组,就像这样 -
def numpy_ewm_alpha_v2(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.convolve(a,wghts)
out[:windowSize-1] = np.nan
return out[:a.size]
时间安排 -
In [117]: alpha = 0.55
...: windowSize = 20
...:
In [118]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [119]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
In [120]: %timeit numpy_ewm_alpha_v2(df.values.ravel(), alpha = alpha, windowSize = windowSize)
10000 loops, best of 3: 195 µs per loop
【讨论】:
您可以查看我在下面发布的解决方案,也许它会激发一些想法。该解决方案根据 ewma 的基本公式给出了正确的结果,唯一的问题是循环以上是关于NumPy 版本的“指数加权移动平均线”,相当于 pandas.ewm().mean()的主要内容,如果未能解决你的问题,请参考以下文章