什么是创建 NumPy 数组的多个幂的矢量化方法?

Posted

技术标签:

【中文标题】什么是创建 NumPy 数组的多个幂的矢量化方法?【英文标题】:What is a vectorized way to create multiple powers of a NumPy array? 【发布时间】:2018-10-29 22:03:35 【问题描述】:

我有一个 NumPy 数组:

arr = [[1, 2],
       [3, 4]]

我想创建一个新数组,其中包含 arrorder 的幂:

# arr_new = [arr^0, arr^1, arr^2, arr^3,...arr^order]
arr_new = [[1, 1, 1, 2, 1, 4, 1, 8],
           [1, 1, 3, 4, 9, 16, 27, 64]]

我目前的方法使用for 循环:

# Pre-allocate an array for powers
arr = np.array([[1, 2],[3,4]])
order = 3
rows, cols = arr.shape
arr_new = np.zeros((rows, (order+1) * cols))

# Iterate over each exponent
for i in range(order + 1):
    arr_new[:, (i * cols) : (i + 1) * cols] = arr**i
print(arr_new)

是否有更快(即矢量化)的方法来创建数组的幂?


基准测试

感谢@hpaulj、@Divakar 和@Paul Panzer 的解答。我在以下测试数组上对基于循环和基于广播的操作进行了基准测试。

arr = np.array([[1, 2],
                [3,4]])
order = 3

arrLarge = np.random.randint(0, 10, (100, 100))  # 100 x 100 array
orderLarge = 10

loop_based 函数是:

def loop_based(arr, order):
    # pre-allocate an array for powers
    rows, cols = arr.shape
    arr_new = np.zeros((rows, (order+1) * cols))
    # iterate over each exponent
    for i in range(order + 1):
        arr_new[:, (i * cols) : (i + 1) * cols] = arr**i
    return arr_new

使用hstackbroadcast_based 函数是:

def broadcast_based_hstack(arr, order):
    # Create a 3D exponent array for a 2D input array to force broadcasting
    powers = np.arange(order + 1)[:, None, None]
    # Generate values (third axis contains array at various powers)
    exponentiated = arr ** powers
    # Reshape and return array
    return np.hstack(exponentiated)   # <== using hstack function

使用reshapebroadcast_based 函数是:

def broadcast_based_reshape(arr, order):
    # Create a 3D exponent array for a 2D input array to force broadcasting
    powers = np.arange(order + 1)[:, None]
    # Generate values (3-rd axis contains array at various powers)
    exponentiated = arr[:, None] ** powers
    # reshape and return array
    return exponentiated.reshape(arr.shape[0], -1)  # <== using reshape function

broadcast_based 函数使用累积积 cumprodreshape

def broadcast_cumprod_reshape(arr, order):
    rows, cols = arr.shape
    # Create 3D empty array where the middle dimension is
    # the array at powers 0 through order
    out = np.empty((rows, order + 1, cols), dtype=arr.dtype)
    out[:, 0, :] = 1   # 0th power is always 1
    a = np.broadcast_to(arr[:, None], (rows, order, cols))
    # Cumulatively multiply arrays so each multiplication produces the next order
    np.cumprod(a, axis=1, out=out[:,1:,:])
    return out.reshape(rows, -1)

在 Jupyter notebook 上,我使用timeit 命令得到了这些结果:

小阵列 (2x2)

%timeit -n 100000 loop_based(arr, order)
7.41 µs ± 174 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit -n 100000 broadcast_based_hstack(arr, order)
10.1 µs ± 137 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit -n 100000 broadcast_based_reshape(arr, order)
3.31 µs ± 61.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit -n 100000 broadcast_cumprod_reshape(arr, order)
11 µs ± 102 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

大型阵列 (100x100)

%timeit -n 1000 loop_based(arrLarge, orderLarge)
261 µs ± 5.82 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit -n 1000 broadcast_based_hstack(arrLarge, orderLarge)
225 µs ± 4.15 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit -n 1000 broadcast_based_reshape(arrLarge, orderLarge)
223 µs ± 2.16 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

%timeit -n 1000 broadcast_cumprod_reshape(arrLarge, orderLarge)
157 µs ± 1.02 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

结论:

似乎使用reshape 的基于广播的方法对于较小的数组更快。但是,对于大型数组,cumprod 方法的扩展性更好,速度更快。

【问题讨论】:

broadcast_based_reshape 看起来不对。扩展数组的顺序不同。请关注我的帖子以获取确切的代码。 我尝试了两种方法:reshape- 扩展输入数组 arr 和不扩展输入数组。似乎它们的运行时间在彼此的误差范围内。我理解为什么要制作更高维的指数数组(np.arange(order+1)[:,None])来强制广播。不知道你为什么还用arr[:, None] 需要扩展两个数组以便稍后使用 reshape,其想法是在输出中根据需要进行排序。我更多地谈论的是输出的正确性,而不是那里的性能。 谢谢 - 我意识到我在实施你的方法时犯了错误。固定(我认为)。 是的,现在看起来不错。 【参考方案1】:

您正在创建一个带有重塑的Vandermonde matrix,因此最好使用numpy.vander 来创建它,并让其他人负责最好的算法。

这样你的代码就是:

np.vander(arr.ravel(), order).reshape((arr.shape[0], -1))

也就是说,他们似乎使用了像 Paul Panzer 的 cumprod 方法 under the hood 这样的东西,所以它应该可以很好地扩展。

【讨论】:

谢谢!我想知道在不确切知道要寻​​找什么的情况下,如何寻找这样的功能。 就我而言,几年前我在学习 Reed Solomon 编码时遇到了它。我忘记了关于那些东西的所有其他内容,但范德蒙德矩阵不断出现!【参考方案2】:

这是一个使用累积乘法的解决方案,它比基于幂的方法具有更好的扩展性,尤其是当输入数组是 float dtype 时:

import numpy as np

def f_mult(a, k):
    m, n = a.shape
    out = np.empty((m, k, n), dtype=a.dtype)
    out[:, 0, :] = 1
    a = np.broadcast_to(a[:, None], (m, k-1, n))
    a.cumprod(axis=1, out=out[:, 1:])
    return out.reshape(m, -1)

时间安排:

int up to power 9
divakar: 0.4342731796205044 ms
hpaulj:  0.794165057130158 ms
pp:      0.20520629966631532 ms
float up to power 39
divakar: 29.056487752124667 ms
hpaulj:  31.773792404681444 ms
pp:      1.0329263447783887 ms

计时代码,感谢@Divakar:

def f_divakar(a, k):
    return (a[:,None]**np.arange(k)[:,None]).reshape(a.shape[0],-1)

def f_hpaulj(a, k):
    return np.hstack(a**np.arange(k)[:,None,None])


from timeit import timeit

np.random.seed(0)
a = np.random.randint(0,9,(100,100))
k = 10
print('int up to power 9')
print('divakar:', timeit(lambda: f_divakar(a, k), number=1000), 'ms')
print('hpaulj: ', timeit(lambda: f_hpaulj(a, k), number=1000), 'ms')
print('pp:     ', timeit(lambda: f_mult(a, k), number=1000), 'ms')

a = np.random.uniform(0.5,2.0,(100,100))
k = 40
print('float up to power 39')
print('divakar:', timeit(lambda: f_divakar(a, k), number=1000), 'ms')
print('hpaulj: ', timeit(lambda: f_hpaulj(a, k), number=1000), 'ms')
print('pp:     ', timeit(lambda: f_mult(a, k), number=1000), 'ms')

【讨论】:

嗯 - 当我实施 cumprod 方法时,我得到了不同的结果。我对您的答案的实现已添加到我的基准测试编辑中。 @hazrmad,正如我所说,它扩展更好,即只要问题规模足够大,它就会更快。您的测试示例是如此之小,以至于您主要测量开销。 谢谢!我用 100x100 阵列进行了测试,确实你的方法更快。我会添加注释。【参考方案3】:

reshaping 的帮助下,让broadcasting 发挥其魔法的作用,将阵列扩展到更高的暗度 -

In [16]: arr = np.array([[1, 2],[3,4]])

In [17]: order = 3

In [18]: (arr[:,None]**np.arange(order+1)[:,None]).reshape(arr.shape[0],-1)
Out[18]: 
array([[ 1,  1,  1,  2,  1,  4,  1,  8],
       [ 1,  1,  3,  4,  9, 16, 27, 64]])

请注意,arr[:,None] 本质上是 arr[:,None,:],但为简洁起见,我们可以跳过尾随省略号。

更大数据集上的时间 -

In [40]: np.random.seed(0)
    ...: arr = np.random.randint(0,9,(100,100))
    ...: order = 10

# @hpaulj's soln with broadcasting and stacking
In [41]: %timeit np.hstack(arr **np.arange(order+1)[:,None,None])
1000 loops, best of 3: 734 µs per loop

In [42]: %timeit (arr[:,None]**np.arange(order+1)[:,None]).reshape(arr.shape[0],-1)
1000 loops, best of 3: 401 µs per loop

重塑部分实际上是免费的,这就是我们在这里获得性能的地方,当然还有广播部分,如下面的细分所示 -

In [52]: %timeit (arr[:,None]**np.arange(order+1)[:,None])
1000 loops, best of 3: 390 µs per loop

In [53]: %timeit (arr[:,None]**np.arange(order+1)[:,None]).reshape(arr.shape[0],-1)
1000 loops, best of 3: 401 µs per loop

【讨论】:

【参考方案4】:

使用广播生成值,并根据需要重新调整或重新排列值:

In [34]: arr **np.arange(4)[:,None,None]
Out[34]: 
array([[[ 1,  1],
        [ 1,  1]],

       [[ 1,  2],
        [ 3,  4]],

       [[ 1,  4],
        [ 9, 16]],

       [[ 1,  8],
        [27, 64]]])
In [35]: np.hstack(_)
Out[35]: 
array([[ 1,  1,  1,  2,  1,  4,  1,  8],
       [ 1,  1,  3,  4,  9, 16, 27, 64]])

【讨论】:

以上是关于什么是创建 NumPy 数组的多个幂的矢量化方法?的主要内容,如果未能解决你的问题,请参考以下文章

多个维度的 NumPy PolyFit 和 PolyVal?

使用Numpy以矢量化方式检索多个值的索引

Numpy常用概念-对象的副本和视图向量化广播机制

是否可以对 NumPy 数组的递归计算进行矢量化,其中每个元素都依赖于前一个元素?

向量化numpy追加循环

如何矢量化 numpy 数组的 2x2 子数组的平均值?