如何使用 numpy 在二维数组上执行最大/均值池化
Posted
技术标签:
【中文标题】如何使用 numpy 在二维数组上执行最大/均值池化【英文标题】:how to perform max/mean pooling on a 2d array using numpy 【发布时间】:2017-07-16 17:36:25 【问题描述】:给定一个 2D(M x N) 矩阵和一个 2D Kernel(K x L),我如何返回一个矩阵,它是在图像上使用给定核进行最大或均值池化的结果?
如果可能,我想使用 numpy。
注意:M、N、K、L 既可以是偶数也可以是奇数,它们不需要完全可被彼此整除,例如:7x5 矩阵和 2x2 核。
例如最大池化:
matrix:
array([[ 20, 200, -5, 23],
[ -13, 134, 119, 100],
[ 120, 32, 49, 25],
[-120, 12, 09, 23]])
kernel: 2 x 2
soln:
array([[ 200, 119],
[ 120, 49]])
【问题讨论】:
【参考方案1】:你可以使用 scikit-image block_reduce:
import numpy as np
import skimage.measure
a = np.array([
[ 20, 200, -5, 23],
[ -13, 134, 119, 100],
[ 120, 32, 49, 25],
[-120, 12, 9, 23]
])
skimage.measure.block_reduce(a, (2,2), np.max)
给予:
array([[200, 119],
[120, 49]])
【讨论】:
这是一个很好的答案,如果您只需要按缩小尺寸大步前进的话。此 API 不允许(还)修改您的步幅(遗憾的是)。赞成:-)【参考方案2】:如果图像大小可以被内核大小整除,您可以重新整形数组并根据需要使用max
或mean
import numpy as np
mat = np.array([[ 20, 200, -5, 23],
[ -13, 134, 119, 100],
[ 120, 32, 49, 25],
[-120, 12, 9, 23]])
M, N = mat.shape
K = 2
L = 2
MK = M // K
NL = N // L
print(mat[:MK*K, :NL*L].reshape(MK, K, NL, L).max(axis=(1, 3)))
# [[200, 119], [120, 49]]
如果您没有偶数个内核,则必须单独处理边界。 (正如 cmets 中所指出的,这会导致矩阵被复制,这会影响性能)。
mat = np.array([[20, 200, -5, 23, 7],
[-13, 134, 119, 100, 8],
[120, 32, 49, 25, 12],
[-120, 12, 9, 23, 15],
[-57, 84, 19, 17, 82],
])
# soln
# [200, 119, 8]
# [120, 49, 15]
# [84, 19, 82]
M, N = mat.shape
K = 2
L = 2
MK = M // K
NL = N // L
# split the matrix into 'quadrants'
Q1 = mat[:MK * K, :NL * L].reshape(MK, K, NL, L).max(axis=(1, 3))
Q2 = mat[MK * K:, :NL * L].reshape(-1, NL, L).max(axis=2)
Q3 = mat[:MK * K, NL * L:].reshape(MK, K, -1).max(axis=1)
Q4 = mat[MK * K:, NL * L:].max()
# compose the individual quadrants into one new matrix
soln = np.vstack([np.c_[Q1, Q3], np.c_[Q2, Q4]])
print(soln)
# [[200 119 8]
# [120 49 15]
# [ 84 19 82]]
【讨论】:
M, N = mat.shape
在这里会更清楚。您还应该指出,即使内核不划分源,但丢弃边界(并产生副本),您的答案仍然有效。【参考方案3】:
我们可以填充它以使其均匀整除,而不是像 Elliot 的回答那样制作“象限”,然后执行最大或均值池化。
由于 CNN 中经常使用池化,因此输入数组通常是 3D。所以我制作了一个适用于 2D 或 3D 数组的函数。
def pooling(mat,ksize,method='max',pad=False):
'''Non-overlapping pooling on 2D or 3D data.
<mat>: ndarray, input array to pool.
<ksize>: tuple of 2, kernel size in (ky, kx).
<method>: str, 'max for max-pooling,
'mean' for mean-pooling.
<pad>: bool, pad <mat> or not. If no pad, output has size
n//f, n being <mat> size, f being kernel size.
if pad, output has size ceil(n/f).
Return <result>: pooled matrix.
'''
m, n = mat.shape[:2]
ky,kx=ksize
_ceil=lambda x,y: int(numpy.ceil(x/float(y)))
if pad:
ny=_ceil(m,ky)
nx=_ceil(n,kx)
size=(ny*ky, nx*kx)+mat.shape[2:]
mat_pad=numpy.full(size,numpy.nan)
mat_pad[:m,:n,...]=mat
else:
ny=m//ky
nx=n//kx
mat_pad=mat[:ny*ky, :nx*kx, ...]
new_shape=(ny,ky,nx,kx)+mat.shape[2:]
if method=='max':
result=numpy.nanmax(mat_pad.reshape(new_shape),axis=(1,3))
else:
result=numpy.nanmean(mat_pad.reshape(new_shape),axis=(1,3))
return result
有时您可能希望以不等于内核大小的步幅执行重叠池。这是一个可以做到这一点的函数,有或没有填充:
def asStride(arr,sub_shape,stride):
'''Get a strided sub-matrices view of an ndarray.
See also skimage.util.shape.view_as_windows()
'''
s0,s1=arr.strides[:2]
m1,n1=arr.shape[:2]
m2,n2=sub_shape
view_shape=(1+(m1-m2)//stride[0],1+(n1-n2)//stride[1],m2,n2)+arr.shape[2:]
strides=(stride[0]*s0,stride[1]*s1,s0,s1)+arr.strides[2:]
subs=numpy.lib.stride_tricks.as_strided(arr,view_shape,strides=strides)
return subs
def poolingOverlap(mat,ksize,stride=None,method='max',pad=False):
'''Overlapping pooling on 2D or 3D data.
<mat>: ndarray, input array to pool.
<ksize>: tuple of 2, kernel size in (ky, kx).
<stride>: tuple of 2 or None, stride of pooling window.
If None, same as <ksize> (non-overlapping pooling).
<method>: str, 'max for max-pooling,
'mean' for mean-pooling.
<pad>: bool, pad <mat> or not. If no pad, output has size
(n-f)//s+1, n being <mat> size, f being kernel size, s stride.
if pad, output has size ceil(n/s).
Return <result>: pooled matrix.
'''
m, n = mat.shape[:2]
ky,kx=ksize
if stride is None:
stride=(ky,kx)
sy,sx=stride
_ceil=lambda x,y: int(numpy.ceil(x/float(y)))
if pad:
ny=_ceil(m,sy)
nx=_ceil(n,sx)
size=((ny-1)*sy+ky, (nx-1)*sx+kx) + mat.shape[2:]
mat_pad=numpy.full(size,numpy.nan)
mat_pad[:m,:n,...]=mat
else:
mat_pad=mat[:(m-ky)//sy*sy+ky, :(n-kx)//sx*sx+kx, ...]
view=asStride(mat_pad,ksize,stride)
if method=='max':
result=numpy.nanmax(view,axis=(2,3))
else:
result=numpy.nanmean(view,axis=(2,3))
return result
【讨论】:
这比 scikit 的 block_reduce 快 30 倍。block_reduce
:9093 function calls in 0.035 seconds
pooling
:10 function calls in 0.001 seconds
@Tyathalae 我可能在您的评论中遗漏了一些关于分析的上下文,但在我看来,如果在 0.035 秒内有 9093 个函数调用 Scikit 的 block_reduce
(每 ~3.85μs 1 个)在 0.001 秒内(每 ~0.1 毫秒 1 次)对上述池化函数只有 10 次函数调用,这是否意味着 Scikit 的 block_reduce
实际上比上述实现快约 26 倍?此外,如果我没看错的话,样本量(即函数调用)的差异会非常大。你能澄清一下吗?谢谢!
嗨@Greenstick,你是对的,我的评论有点模棱两可。它显示了为完成该操作而进行的函数(子调用)的数量。因此,scikit 总共为单个 block_reduce()
调用调用了 9093
子函数。旁注:该输出格式来自cProfile
。
啊,明白了!感谢您的澄清:)
@Sam-gege 是的,看看这个博客:numbersmithy.com/2d-and-3d-pooling-using-numpy/…【参考方案4】:
由于 numpy 文档说要“极其小心”地使用“numpy.lib.stride_tricks.as_strided”,所以这里是另一种没有它的 2D/3D 池化解决方案。
如果 strides=1,则使用相同的填充。对于 strides>1,我不能 100% 确定相同的填充是如何定义的......
def pool3D(arr,
kernel=(2, 2, 2),
stride=(1, 1, 1),
func=np.nanmax,
):
# check inputs
assert arr.ndim == 3
assert len(kernel) == 3
# create array with lots of padding around it, from which we grab stuff (could be more efficient, yes)
arr_padded_shape = arr.shape + 2 * np.array(kernel)
arr_padded = np.zeros(arr_padded_shape, dtype=arr.dtype) * np.nan
arr_padded[
kernel[0]:kernel[0] + arr.shape[0],
kernel[1]:kernel[1] + arr.shape[1],
kernel[2]:kernel[2] + arr.shape[2],
] = arr
# create temporary array, which aggregates kernel elements in last axis
size_x = 1 + (arr.shape[0]-1) // stride[0]
size_y = 1 + (arr.shape[1]-1) // stride[1]
size_z = 1 + (arr.shape[2]-1) // stride[2]
size_kernel = np.prod(kernel)
arr_tmp = np.empty((size_x, size_y, size_z, size_kernel), dtype=arr.dtype)
# fill temporary array
kx_center = (kernel[0] - 1) // 2
ky_center = (kernel[1] - 1) // 2
kz_center = (kernel[2] - 1) // 2
idx_kernel = 0
for kx in range(kernel[0]):
dx = kernel[0] + kx - kx_center
for ky in range(kernel[1]):
dy = kernel[1] + ky - ky_center
for kz in range(kernel[2]):
dz = kernel[2] + kz - kz_center
arr_tmp[:, :, :, idx_kernel] = arr_padded[
dx:dx + arr.shape[0]:stride[0],
dy:dy + arr.shape[1]:stride[1],
dz:dz + arr.shape[2]:stride[2],
]
idx_kernel += 1
# perform pool function
arr_final = func(arr_tmp, axis=-1)
return arr_final
def pool2D(arr,
kernel=(2, 2),
stride=(1, 1),
func=np.nanmax,
):
# check inputs
assert arr.ndim == 2
assert len(kernel) == 2
# transform into 3D array with empty dimension?
arr3D = arr[..., np.newaxis]
kernel3D = kernel + (1,)
stride3D = stride + (1,)
arr3D_final = pool3D(arr3D, kernel3D, stride3D, func)
arr2D_final = arr3D_final[:, :, 0]
return arr2D_final
【讨论】:
【参考方案5】:另一种解决方案使用了鲜为人知的 np.maximum.at
魔法(或者您可以使用 np.add.at 和除法使其适应均值池)
def max_pool(img, factor: int):
""" Perform max pooling with a (factor x factor) kernel"""
ds_img = np.full((img.shape[0] // factor, img.shape[1] // factor), -float('inf'), dtype=img.dtype)
np.maximum.at(ds_img, (np.arange(img.shape[0])[:, None] // factor, np.arange(img.shape[1]) // factor), img)
return ds_img
示例用法:
img = np.array([[20, 200, -5, 23],
[-13, 134, 119, 100],
[120, 32, 49, 25],
[-120, 12, 9, 23]])
print(f'Input: \nimg')
print(f"Output: \nmax_pool(img, factor=2)")
打印
Input:
[[ 20 200 -5 23]
[ -13 134 119 100]
[ 120 32 49 25]
[-120 12 9 23]]
Output:
[[200 119]
[120 49]]
不幸的是,它似乎有点慢,所以我仍然会使用 mdh 提供的解决方案
【讨论】:
以上是关于如何使用 numpy 在二维数组上执行最大/均值池化的主要内容,如果未能解决你的问题,请参考以下文章