M×N 形状 numpy.ndarray 的滑动窗口

Posted

技术标签:

【中文标题】M×N 形状 numpy.ndarray 的滑动窗口【英文标题】:Sliding window of M-by-N shape numpy.ndarray 【发布时间】:2013-03-21 06:44:20 【问题描述】:

我有一个形状为 (6,2) 的 Numpy 数组:

[[ 0, 1],
 [10,11],
 [20,21],
 [30,31],
 [40,41],
 [50,51]]

我需要一个滑动窗口,其步长为1,窗口大小为3,如下所示:

[[ 0, 1,10,11,20,21],
 [10,11,20,21,30,31],
 [20,21,30,31,40,41],
 [30,31,40,41,50,51]]

我正在寻找一个 Numpy 解决方案。如果您的解决方案可以参数化原始数组的形状以及窗口大小和步长,那就太好了。


我找到了这个相关的答案Using strides for an efficient moving average filter,但我看不到如何在那里指定步长以及如何将窗口从 3d 折叠到连续的 2d 数组。还有这个Rolling or sliding window iterator?,但那是在Python中,我不确定它的效率如何。此外,它支持元素,但如果每个元素都有多个特征,它最终不会将它们连接在一起。

【问题讨论】:

gist.github.com/seberg/3866040 numpy 的多维 rolling_window 我更改了标题以明确这不是***.com/q/13728392/52074的重复项 【参考方案1】:

您可以使用精美的索引在 numpy 中创建矢量化滑动窗口。

>>> import numpy as np

>>> a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])

>>> a
array([[ 0,  1],
       [10, 11],
       [20, 21],                      #define our 2d numpy array
       [30, 31],
       [40, 41],
       [50, 51]])

>>> a = a.flatten()

>>> a
array([ 0,  1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51])    #flattened numpy array

>>> indexer = np.arange(6)[None, :] + 2*np.arange(4)[:, None]

>>> indexer
array([[ 0,  1,  2,  3,  4,  5],
       [ 2,  3,  4,  5,  6,  7],            #sliding window indices
       [ 4,  5,  6,  7,  8,  9],
       [ 6,  7,  8,  9, 10, 11]])

>>> a[indexer]
array([[ 0,  1, 10, 11, 20, 21],
       [10, 11, 20, 21, 30, 31],            #values of a over sliding window
       [20, 21, 30, 31, 40, 41],
       [30, 31, 40, 41, 50, 51]])

>>> np.sum(a[indexer], axis=1)
array([ 63, 123, 183, 243])         #sum of values in 'a' under the sliding window.

解释这段代码在做什么。

np.arange(6)[None, :] 创建一个从 0 到 6 的行向量,np.arange(4)[:, None] 创建一个从 0 到 4 的列向量。这会产生一个 4x6 矩阵,其中每行(其中六个)代表一个窗口,行数(其中四个)代表窗口的数量。 2 的倍数使滑动窗口一次滑动 2 个单位,这是在每个元组上滑动所必需的。使用 numpy 数组切片,您可以将滑动窗口传递到展平的 numpy 数组中,并像 sum 一样对它们进行聚合。

【讨论】:

这应该是正确的答案。我希望我能给你更多的支持。 也可以写成indexer = np.arange(6).reshape(1, -1) + 2 * np.arange(4).reshape(-1, 1) ...我发现它比[None, :] 符号更熟悉。【参考方案2】:
In [1]: import numpy as np

In [2]: a = np.array([[00,01], [10,11], [20,21], [30,31], [40,41], [50,51]])

In [3]: w = np.hstack((a[:-2],a[1:-1],a[2:]))

In [4]: w
Out[4]: 
array([[ 0,  1, 10, 11, 20, 21],
       [10, 11, 20, 21, 30, 31],
       [20, 21, 30, 31, 40, 41],
       [30, 31, 40, 41, 50, 51]])

你可以把它写成这样的函数:

def window_stack(a, stepsize=1, width=3):
    n = a.shape[0]
    return np.hstack( a[i:1+n+i-width:stepsize] for i in range(0,width) )

这并不真正取决于原始数组的形状,只要a.ndim = 2。请注意,我从不在交互式版本中使用任何一种长度。形状的第二维无关紧要;每一行可以任意长。感谢@Jaime 的建议,您可以完全不检查形状:

def window_stack(a, stepsize=1, width=3):
    return np.hstack( a[i:1+i-width or None:stepsize] for i in range(0,width) )

【讨论】:

已修复。我在那里有 +1,但后来在另一个编辑中删除了它。添加了与此相关的评论。 对于 [:-i] 不起作用的东西,我看到 [:-i or None] 使用过。 没错,我的解决方案是在hstackvstack 之间切换,我会检查您的解决方案! @loretoparisi,它应该可以在没有太大变化的情况下工作:首先替换对 np.hstack( ... ) 的调用并使用列表理解:[ ... ]。如果你需要转置它,你可能需要一个zip 这段代码现在产生FutureWarning: arrays to stack must be passed as a "sequence" type such as list or tuple. Support for non-sequence iterables such as generators is deprecated as of NumPy 1.16 and will raise an error in the future. 应该用括号将arg 包围到np.hstack【参考方案3】:

一个解决方案是

np.lib.stride_tricks.as_strided(a, shape=(4,6), strides=(8,4)).

当您开始考虑指针/地址时,使用 strides 很直观。

as_strided() 方法有 3 个参数。

    数据 形状 大步前进

data 是我们要操作的数组。

要使用as_strided() 实现滑动窗口函数,我们必须事先计算输出的形状。在问题中,(4,6) 是输出的形状。如果尺寸不正确,我们最终会读取垃圾值。这是因为我们通过将指针移动几个字节(取决于数据类型)来访问数据。

确定strides 的正确值对于获得预期结果至关重要。 在计算步长之前,使用arr.strides[-1] 找出每个元素占用的内存。在本例中,一个元素占用的内存为 4 个字节。 Numpy 数组以行主要方式创建。下一行的第一个元素紧邻当前行的最后一个元素。

例如:

0 , 1 | 10, 11 | ...

10 就在 1 旁边。

想象一下将 2D 数组重新整形为 1D(这是可以接受的,因为数据以行优先格式存储)。输出中每一行的第一个元素是一维数组中的奇数索引元素。

0, 10, 20, 30, ..

因此,从 0 到 10、10 到 20 等等,我们需要在内存中执行的步数是2 * mem size of element。每行的步幅为2 * 4bytes = 8。 对于输出中的给定行,所有元素在我们想象的一维数组中彼此相邻。要获取一行中的下一个元素,只需迈出一个等于元素大小的步幅。 column stride的值为4字节。

因此,strides=(8,4)

另一种解释: 输出的形状为 (4,6)。列步幅4。因此,第一行元素从索引 0 开始,并且有 6 个元素,每个元素间隔 4 个字节。 收集完第一行后,第二行开始距离当前行开头 8 个字节。第三行开始距离第二行起始点 8 个字节,依此类推。

形状决定了我们需要的行数和列数。 strides 定义了开始一行并收集列元素的内存步骤

【讨论】:

请注意,如果省略第三个参数,则 strides 值将从作为第一个参数传入的数组中获取。这样您就不必自己解决这个问题了。【参考方案4】:

使用more_itertools.windowed1 可以理解简短列表:

给定

import numpy as np
import more_itertools as mit


a = [["00","01"],
     ["10","11"],
     ["20","21"],
     ["30","31"],
     ["40","41"],
     ["50","51"]]

b = np.array(a)

代码

np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])

np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])

np.array(list(mit.windowed(b.ravel(), n=6)))

输出

array([['00', '01', '10', '11', '20', '21'],
       ['10', '11', '20', '21', '30', '31'],
       ['20', '21', '30', '31', '40', '41'],
       ['30', '31', '40', '41', '50', '51']], 
      dtype='<U2')

大小为n=3 的滑动窗口被创建并展平。注意默认步长为more_itertools.windowed(..., step=1)


性能

作为一个数组,接受的答案是最快的。

%timeit np.hstack((a[:-2], a[1:-1], a[2:]))
# 37.5 µs ± 1.88 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.hstack((b[:-2], b[1:-1], b[2:]))
# 12.9 µs ± 166 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit np.array([list(mit.flatten(w)) for w in mit.windowed(a, n=3)])
# 23.2 µs ± 1.73 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array([[i for item in w for i in item] for w in mit.windowed(a, n=3)])
# 21.2 µs ± 999 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit np.array(list(mit.windowed(b.ravel(), n=6)))
# 43.4 µs ± 374 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

实现itertool recipes 和许多有用工具的第三方库。

【讨论】:

【参考方案5】:

Numpy 1.20开始,使用新的sliding_window_view滑动/滚动元素窗口,基于与user42541's answer相同的思路,我们可以这样做:

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])
sliding_window_view(values.flatten(), window_shape = 2*3)[::2]
# array([[ 0,  1, 10, 11, 20, 21],
#        [10, 11, 20, 21, 30, 31],
#        [20, 21, 30, 31, 40, 41],
#        [30, 31, 40, 41, 50, 51]])

其中2 是子数组的大小,3 是窗口。


中间步骤详情:

# values = np.array([[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]])

# Flatten the array (concatenate sub-arrays):
values.flatten()
# array([ 0,  1, 10, 11, 20, 21, 30, 31, 40, 41, 50, 51])

# Slide through windows of size 2*3=6:
sliding_window_view(values.flatten(), 2*3)
# array([[ 0,  1, 10, 11, 20, 21],
#        [ 1, 10, 11, 20, 21, 30],
#        [10, 11, 20, 21, 30, 31],
#        [11, 20, 21, 30, 31, 40],
#        [20, 21, 30, 31, 40, 41],
#        [21, 30, 31, 40, 41, 50],
#        [30, 31, 40, 41, 50, 51]])

# Only keep even rows (1 row in 2 - if sub-arrays have a size of x, then replace 2 with x):
sliding_window_view(values.flatten(), 2*3)[::2]
# array([[ 0,  1, 10, 11, 20, 21],
#        [10, 11, 20, 21, 30, 31],
#        [20, 21, 30, 31, 40, 41],
#        [30, 31, 40, 41, 50, 51]])

【讨论】:

【参考方案6】:

从 NumPy 版本 1.20.0 开始,可以使用

np.lib.stride_tricks.sliding_window_view(arr, winsize)

例子:

>>> arr = np.arange(0, 9).reshape((3, 3))
>>> np.lib.stride_tricks.sliding_window_view(arr, (2, 2))

array([[[[0, 1],
         [3, 4]],

        [[1, 2],
         [4, 5]]],


       [[[3, 4],
         [6, 7]],

        [[4, 5],
         [7, 8]]]])

您可以阅读更多关于它的信息here。

【讨论】:

【参考方案7】:

这里是使用 Numpy >= v1.17 的 One-liner

rowsJoined = 3

splits = np.vstack(np.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))]).reshape(-1))).reshape(-1, rowsJoined * x.shape[1]) 

测试

x = np.array([[00,1],
              [10,11],
              [20,21],
              [30,31],
              [40,41],
              [50,51]])

结果

[[ 0  1 10 11 20 21]
 [10 11 20 21 30 31]
 [20 21 30 31 40 41]
 [30 31 40 41 50 51]]

在大型阵列上测试性能

import numpy as np
import time

x = np.array(range(1000)).reshape(-1, 2)
rowsJoined = 3

all_t = 0.
for i in range(1000):
    start_ = time.time()
    np.vstack(
        numpy.split(x,np.array([[i, i + rowsJoined] for i in range(x.shape[0] - (rowsJoined - 1))])
                    .reshape(-1))).reshape(-1, rowsJoined * x.shape[1])
    all_t += time.time() - start_

print('Average Time of 1000 Iterations on Array of Shape '
      '1000 x 2 is:  Seconds.'.format(all_t/1000.))

性能结果

Average Time of 1000 Iterations on Array of Shape 1000 x 2 is: 0.0016909 Seconds.

【讨论】:

【参考方案8】:

这是一个纯 Python 实现:

def sliding_window(arr, window=3):
    i = iter(arr)
    a = []
    for e in range(0, window): a.append(next(i))
    yield a
    for e in i:
        a = a[1:] + [e]
        yield a

一个例子:

# flatten array
flatten = lambda l: [item for sublist in l for item in sublist]

a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
w = sliding_window(a, width=3)
print( list(map(flatten,w)) )

[[0, 1, 10, 11, 20, 21], [10, 11, 20, 21, 30, 31], [20, 21, 30, 31, 40, 41], [30, 31, 40, 41, 50, 51]]

基准测试

import timeit
def benchmark():
  a = [[0,1], [10,11], [20,21], [30,31], [40,41], [50,51]]
  sliding_window(a, width=3)

times = timeit.Timer(benchmark).repeat(3, number=1000)
time_taken = min(times) / 1000
print(time_taken)

1.0944640007437556e-06

【讨论】:

以上是关于M×N 形状 numpy.ndarray 的滑动窗口的主要内容,如果未能解决你的问题,请参考以下文章

利用Python中的numpy.ndarray.reshape()对阵列形状进行调整

numpy.ndarray 如何标准化?

Python Pandas:“numpy.ndarray”对象没有属性“apply”

沿轴插入 numpy.ndarray 的最佳方法

Numpy Ndarray 对象

获取二维 numpy ndarray 或 numpy 矩阵中前 N 个值的索引