M×N 形状 numpy.ndarray 的滑动窗口
Posted
技术标签:
【中文标题】M×N 形状 numpy.ndarray 的滑动窗口【英文标题】:Sliding window of M-by-N shape numpy.ndarray 【发布时间】:2013-03-21 06:44:20 【问题描述】:我有一个形状为 (6,2) 的 Numpy 数组:
[[ 0, 1],
[10,11],
[20,21],
[30,31],
[40,41],
[50,51]]
我需要一个滑动窗口,其步长为1
,窗口大小为3
,如下所示:
[[ 0, 1,10,11,20,21],
[10,11,20,21,30,31],
[20,21,30,31,40,41],
[30,31,40,41,50,51]]
我正在寻找一个 Numpy 解决方案。如果您的解决方案可以参数化原始数组的形状以及窗口大小和步长,那就太好了。
我找到了这个相关的答案Using strides for an efficient moving average filter,但我看不到如何在那里指定步长以及如何将窗口从 3d 折叠到连续的 2d 数组。还有这个Rolling or sliding window iterator?,但那是在Python中,我不确定它的效率如何。此外,它支持元素,但如果每个元素都有多个特征,它最终不会将它们连接在一起。
【问题讨论】:
gist.github.com/seberg/3866040 numpy 的多维 rolling_window 我更改了标题以明确这不是***.com/q/13728392/52074的重复项 【参考方案1】:您可以使用精美的索引在 numpy 中创建矢量化滑动窗口。
>>> import numpy as np
>>> a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])
>>> a
array([[ 0, 1],
[10, 11],
[20, 21], #define our 2d numpy array
[30, 31],
[40, 41],
[50, 51]])
>>> a = a.flatten()
>>> a
array([ 0, 1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51]) #flattened numpy array
>>> indexer = np.arange(6)[None, :] + 2*np.arange(4)[:, None]
>>> indexer
array([[ 0, 1, 2, 3, 4, 5],
[ 2, 3, 4, 5, 6, 7], #sliding window indices
[ 4, 5, 6, 7, 8, 9],
[ 6, 7, 8, 9, 10, 11]])
>>> a[indexer]
array([[ 0, 1, 10, 11, 20, 21],
[10, 11, 20, 21, 30, 31], #values of a over sliding window
[20, 21, 30, 31, 40, 41],
[30, 31, 40, 41, 50, 51]])
>>> np.sum(a[indexer], axis=1)
array([ 63, 123, 183, 243]) #sum of values in 'a' under the sliding window.
解释这段代码在做什么。
np.arange(6)[None, :]
创建一个从 0 到 6 的行向量,np.arange(4)[:, None]
创建一个从 0 到 4 的列向量。这会产生一个 4x6 矩阵,其中每行(其中六个)代表一个窗口,行数(其中四个)代表窗口的数量。 2 的倍数使滑动窗口一次滑动 2 个单位,这是在每个元组上滑动所必需的。使用 numpy 数组切片,您可以将滑动窗口传递到展平的 numpy 数组中,并像 sum 一样对它们进行聚合。
【讨论】:
这应该是正确的答案。我希望我能给你更多的支持。 也可以写成indexer = np.arange(6).reshape(1, -1) + 2 * np.arange(4).reshape(-1, 1)
...我发现它比[None, :]
符号更熟悉。【参考方案2】:
In [1]: import numpy as np
In [2]: a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])
In [3]: w = np.hstack((a[:-2],a[1:-1],a[2:]))
In [4]: w
Out[4]:
array([[ 0, 1, 10, 11, 20, 21],
[10, 11, 20, 21, 30, 31],
[20, 21, 30, 31, 40, 41],
[30, 31, 40, 41, 50, 51]])
你可以把它写成这样的函数:
def window_stack(a, stepsize=1, width=3):
n = a.shape[0]
return np.hstack( a[i:1+n+i-width:stepsize] for i in range(0,width) )
这并不真正取决于原始数组的形状,只要a.ndim = 2
。请注意,我从不在交互式版本中使用任何一种长度。形状的第二维无关紧要;每一行可以任意长。感谢@Jaime 的建议,您可以完全不检查形状:
def window_stack(a, stepsize=1, width=3):
return np.hstack( a[i:1+i-width or None:stepsize] for i in range(0,width) )
【讨论】:
已修复。我在那里有 +1,但后来在另一个编辑中删除了它。添加了与此相关的评论。 对于[:-i]
不起作用的东西,我看到 [:-i or None]
使用过。
没错,我的解决方案是在hstack
和vstack
之间切换,我会检查您的解决方案!
@loretoparisi,它应该可以在没有太大变化的情况下工作:首先替换对 np.hstack( ... )
的调用并使用列表理解:[ ... ]
。如果你需要转置它,你可能需要一个zip
。
这段代码现在产生FutureWarning: arrays to stack must be passed as a "sequence" type such as list or tuple. Support for non-sequence iterables such as generators is deprecated as of NumPy 1.16 and will raise an error in the future.
应该用括号将arg 包围到np.hstack
。【参考方案3】:
一个解决方案是
np.lib.stride_tricks.as_strided(a, shape=(4,6), strides=(8,4))
.
当您开始考虑指针/地址时,使用 strides 很直观。
as_strided()
方法有 3 个参数。
-
数据
形状
大步前进
data
是我们要操作的数组。
要使用as_strided()
实现滑动窗口函数,我们必须事先计算输出的形状。在问题中,(4,6) 是输出的形状。如果尺寸不正确,我们最终会读取垃圾值。这是因为我们通过将指针移动几个字节(取决于数据类型)来访问数据。
确定strides
的正确值对于获得预期结果至关重要。
在计算步长之前,使用arr.strides[-1]
找出每个元素占用的内存。在本例中,一个元素占用的内存为 4 个字节。
Numpy 数组以行主要方式创建。下一行的第一个元素紧邻当前行的最后一个元素。
例如:
0 , 1 | 10, 11 | ...
10 就在 1 旁边。
想象一下将 2D 数组重新整形为 1D(这是可以接受的,因为数据以行优先格式存储)。输出中每一行的第一个元素是一维数组中的奇数索引元素。
0, 10, 20, 30, ..
因此,从 0 到 10、10 到 20 等等,我们需要在内存中执行的步数是2 * mem size of element
。每行的步幅为2 * 4bytes = 8
。
对于输出中的给定行,所有元素在我们想象的一维数组中彼此相邻。要获取一行中的下一个元素,只需迈出一个等于元素大小的步幅。 column stride的值为4字节。
因此,strides=(8,4)
另一种解释:
输出的形状为 (4,6)。列步幅4
。因此,第一行元素从索引 0
开始,并且有 6 个元素,每个元素间隔 4 个字节。
收集完第一行后,第二行开始距离当前行开头 8 个字节。第三行开始距离第二行起始点 8 个字节,依此类推。
形状决定了我们需要的行数和列数。 strides 定义了开始一行并收集列元素的内存步骤
【讨论】:
请注意,如果省略第三个参数,则strides
值将从作为第一个参数传入的数组中获取。这样您就不必自己解决这个问题了。【参考方案4】:
使用more_itertools.windowed
1 可以理解简短列表:
给定
import numpy as np
import more_itertools as mit
a = [["00","01"],
["10","11"],
["20","21"],
["30","31"],
["40","41"],
["50","51"]]
b = np.array(a)
代码
np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])
或
np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])
或
np.array(list(mit.windowed(b.ravel(), n=6)))
输出
array([['00', '01', '10', '11', '20', '21'],
['10', '11', '20', '21', '30', '31'],
['20', '21', '30', '31', '40', '41'],
['30', '31', '40', '41', '50', '51']],
dtype='<U2')
大小为n=3
的滑动窗口被创建并展平。注意默认步长为more_itertools.windowed(..., step=1)
。
性能
作为一个数组,接受的答案是最快的。
%timeit np.hstack((a[:-2], a[1:-1], a[2:]))
# 37.5 µs ± 1.88 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.hstack((b[:-2], b[1:-1], b[2:]))
# 12.9 µs ± 166 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
%timeit np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])
# 23.2 µs ± 1.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])
# 21.2 µs ± 999 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array(list(mit.windowed(b.ravel(), n=6)))
# 43.4 µs ± 374 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
实现itertool recipes 和许多有用工具的第三方库。
【讨论】:
【参考方案5】:从Numpy 1.20
开始,使用新的sliding_window_view
滑动/滚动元素窗口,基于与user42541's answer相同的思路,我们可以这样做:
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])
sliding_window_view(values.flatten(), window_shape = 2*3)[::2]
# array([[ 0, 1, 10, 11, 20, 21],
# [10, 11, 20, 21, 30, 31],
# [20, 21, 30, 31, 40, 41],
# [30, 31, 40, 41, 50, 51]])
其中2
是子数组的大小,3
是窗口。
中间步骤详情:
# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])
# Flatten the array (concatenate sub-arrays):
values.flatten()
# array([ 0, 1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51])
# Slide through windows of size 2*3=6:
sliding_window_view(values.flatten(), 2*3)
# array([[ 0, 1, 10, 11, 20, 21],
# [ 1, 10, 11, 20, 21, 30],
# [10, 11, 20, 21, 30, 31],
# [11, 20, 21, 30, 31, 40],
# [20, 21, 30, 31, 40, 41],
# [21, 30, 31, 40, 41, 50],
# [30, 31, 40, 41, 50, 51]])
# Only keep even rows (1 row in 2 - if sub-arrays have a size of x, then replace 2 with x):
sliding_window_view(values.flatten(), 2*3)[::2]
# array([[ 0, 1, 10, 11, 20, 21],
# [10, 11, 20, 21, 30, 31],
# [20, 21, 30, 31, 40, 41],
# [30, 31, 40, 41, 50, 51]])
【讨论】:
【参考方案6】:从 NumPy 版本 1.20.0
开始,可以使用
np.lib.stride_tricks.sliding_window_view(arr, winsize)
例子:
>>> arr = np.arange(0, 9).reshape((3, 3))
>>> np.lib.stride_tricks.sliding_window_view(arr, (2, 2))
array([[[[0, 1],
[3, 4]],
[[1, 2],
[4, 5]]],
[[[3, 4],
[6, 7]],
[[4, 5],
[7, 8]]]])
您可以阅读更多关于它的信息here。
【讨论】:
【参考方案7】:这里是使用 Numpy >= v1.17 的 One-liner
rowsJoined = 3
splits = np.vstack(np.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))]).reshape(-1))).reshape(-1, rowsJoined * x.shape[1])
测试
x = np.array([[00,1],
[10,11],
[20,21],
[30,31],
[40,41],
[50,51]])
结果
[[ 0 1 10 11 20 21]
[10 11 20 21 30 31]
[20 21 30 31 40 41]
[30 31 40 41 50 51]]
在大型阵列上测试性能
import numpy as np
import time
x = np.array(range(1000)).reshape(-1, 2)
rowsJoined = 3
all_t = 0.
for i in range(1000):
start_ = time.time()
np.vstack(
numpy.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))])
.reshape(-1))).reshape(-1, rowsJoined * x.shape[1])
all_t += time.time() - start_
print('Average Time of 1000 Iterations on Array of Shape '
'1000 x 2 is: Seconds.'.format(all_t/1000.))
性能结果
Average Time of 1000 Iterations on Array of Shape 1000 x 2 is: 0.0016909 Seconds.
【讨论】:
【参考方案8】:这是一个纯 Python 实现:
def sliding_window(arr, window=3):
i = iter(arr)
a = []
for e in range(0, window): a.append(next(i))
yield a
for e in i:
a = a[1:] + [e]
yield a
一个例子:
# flatten array
flatten = lambda l: [item for sublist in l for item in sublist]
a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
w = sliding_window(a, width=3)
print( list(map(flatten,w)) )
[[0, 1, 10, 11, 20, 21], [10, 11, 20, 21, 30, 31], [20, 21, 30, 31, 40, 41], [30, 31, 40, 41, 50, 51]]
基准测试
import timeit
def benchmark():
a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
sliding_window(a, width=3)
times = timeit.Timer(benchmark).repeat(3, number=1000)
time_taken = min(times) / 1000
print(time_taken)
1.0944640007437556e-06
【讨论】:
以上是关于M×N 形状 numpy.ndarray 的滑动窗口的主要内容,如果未能解决你的问题,请参考以下文章
利用Python中的numpy.ndarray.reshape()对阵列形状进行调整