是否可以对 NumPy 数组的递归计算进行矢量化,其中每个元素都依赖于前一个元素?

Posted

技术标签:

【中文标题】是否可以对 NumPy 数组的递归计算进行矢量化,其中每个元素都依赖于前一个元素?【英文标题】:Is it possible to vectorize recursive calculation of a NumPy array where each element depends on the previous one? 【发布时间】:2011-05-23 10:05:26 【问题描述】:
T(i) = Tm(i) + (T(i-1)-Tm(i))**(-tau(i))

Tmtau是之前计算过的长度相同的NumPy向量,希望创建一个新向量T。包含i 仅用于指示所需的元素索引。

这种情况需要for循环吗?

【问题讨论】:

听起来是列表理解的绝佳候选者,但我现在无法尝试编写。我很想看看其他人的想法。 如果tau是一个向量,它是否也应该被i索引? 边界条件是什么?即,i=0 时会发生什么? 很明显,必须有一个循环somewhere。我想你的问题是如何使循环发生在 numpy 内部,而不是在 Python 内部。如果这是真正的问题,那么如何创造性地使用numpy.fromiter 也许可以用numpy.<ufunc>.accumulate做点什么? 【参考方案1】:

您可能认为这会起作用:

import numpy as np
n = len(Tm)
t = np.empty(n)

t[0] = 0  # or whatever the initial condition is 
t[1:] = Tm[1:] + (t[0:n-1] - Tm[1:])**(-tau[1:])

但事实并非如此:您实际上不能以这种方式在 numpy 中进行递归(因为 numpy 会计算整个 RHS,然后将其分配给 LHS)。

所以除非你能想出这个公式的非递归版本,否则你会被一个显式循环困住:

tt = np.empty(n)
tt[0] = 0.
for i in range(1,n):
    tt[i] = Tm[i] + (tt[i-1] - Tm[i])**(-tau[i])

【讨论】:

使用 Numba 可以显着加速解决方案。 Numba 包含在 Anaconda 包中,非常易于使用。其他没有 Numba 的 python 代码比这里的代码更快。有关详细基准,请参阅我的答案。【参考方案2】:

为了建立 NPE 的答案,我同意某处必须有一个循环。也许您的目标是避免与 Python for 循环相关的开销?在这种情况下, numpy.fromiter 确实击败了一个 for 循环,但只有一点点:

使用非常简单的递归关系,

x[i+1] = x[i] + 0.1

我明白了

#FOR LOOP
def loopit(n):
     x = [0.0]
     for i in range(n-1): x.append(x[-1] + 0.1)
     return np.array(x)

#FROMITER
#define an iterator (a better way probably exists -- I'm a novice)
def it():
     x = 0.0
     while True:
         yield x
         x += 0.1

#use the iterator with np.fromiter
def fi_it(n):
     return np.fromiter(it(), np.float, n)

%timeit -n 100 loopit(100000)
#100 loops, best of 3: 31.7 ms per loop

%timeit -n 100 fi_it(100000)
#100 loops, best of 3: 18.6 ms per loop

有趣的是,预先分配一个 numpy 数组会导致性能大幅下降。这对我来说是个谜,尽管我猜想访问数组元素肯定比附加到列表有更多的开销。

def loopit(n):
     x = np.zeros(n)
     for i in range(n-1): x[i+1] = x[i] + 0.1
     return x

%timeit -n 100 loopit(100000)
#100 loops, best of 3: 50.1 ms per loop

【讨论】:

【参考方案3】:

2019 年更新。 Numba 代码与新版本的 numba 中断。将dtype="float32" 更改为dtype=np.float32 解决了它。

我执行了一些基准测试,在 2019 年使用 Numba 是人们应该尝试在 Numpy 中加速递归函数的首选(Aronstef 的调整提案)。 Numba 已经预装在 Anaconda 包中,并且是最快的时间之一(比任何 Python 快大约 20 倍)。在 2019 年,Python 支持 @numba 注释,无需额外步骤(至少版本 3.6、3.7 和 3.8)。以下是三个基准:在 2019 年 12 月 5 日、2018 年 10 月 20 日和 2016 年 5 月 18 日执行。

而且,正如 Jaffe 所提到的,在 2018 年仍然不可能向量化递归函数。我检查了 Aronstef 的矢量化,它不起作用。

按执行时间排序的基准:

-------------------------------------------
|Variant        |2019-12 |2018-10 |2016-05 |
-------------------------------------------
|Pure C         |   na   |   na   | 2.75 ms|
|C extension    |   na   |   na   | 6.22 ms|
|Cython float32 | 0.55 ms| 1.01 ms|   na   |
|Cython float64 | 0.54 ms| 1.05 ms| 6.26 ms|
|Fortran f2py   | 4.65 ms|   na   | 6.78 ms|
|Numba float32  |73.0  ms| 2.81 ms|   na   |
|(Aronstef)     |        |        |        |
|Numba float32v2| 1.82 ms| 2.81 ms|   na   |
|Numba float64  |78.9  ms| 5.28 ms|   na   |
|Numba float64v2| 4.49 ms| 5.28 ms|   na   |
|Append to list |73.3  ms|48.2  ms|91.0  ms|
|Using a.item() |36.9  ms|58.3  ms|74.4  ms|
|np.fromiter()  |60.8  ms|60.0  ms|78.1  ms|
|Loop over Numpy|71.3  ms|71.9  ms|87.9  ms|
|(Jaffe)        |        |        |        |
|Loop over Numpy|74.6  ms|74.4  ms|   na   |
|(Aronstef)     |        |        |        |
-------------------------------------------

答案末尾提供了对应代码。

随着时间的推移,Numba 和 Cython 似乎变得更好了。现在它们都比 Fortran f2py 快。 Cython 现在快 8.6 倍,Numba 32bit 快 2.5 倍。 Fortran 在 2016 年很难调试和编译。所以现在根本没有理由使用 Fortran。

我在 2019 年和 2018 年没有检查 Pure C 和 C 扩展,因为在 Jupyter 笔记本中编译它们并不容易。

我在 2019 年进行了以下设置:

Processor: Intel i5-9600K 3.70GHz
Versions:
Python:  3.8.0
Numba:  0.46.0
Cython: 0.29.14
Numpy:  1.17.4

我在 2018 年进行了以下设置:

Processor: Intel i7-7500U 2.7GHz
Versions:
Python:  3.7.0
Numba:  0.39.0
Cython: 0.28.5
Numpy:  1.15.1

推荐使用 float32 的 Numba 代码(调整后的 Aronstef):

@numba.jit("float32[:](float32[:], float32[:])", nopython=True, nogil=True)
def calc_py_jit32v2(Tm_, tau_):
    tt = np.empty(len(Tm_),dtype=np.float32)
    tt[0] = Tm_[0]
    for i in range(1, len(Tm_)):
        tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
    return tt[1:]

所有其他代码:

数据创建(如 Aronstef + Mike T 评论):

np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float64'))
tau = np.random.uniform(-1, 0, size=n).astype('float64')
ar = np.column_stack([Tm,tau])
Tm32 = Tm.astype('float32')
tau32 = tau.astype('float32')
Tm_l = list(Tm)
tau_l = list(tau)

2016 年的代码略有不同,因为我使用 abs() 函数来防止 nans,而不是 Mike T 的变体。2018 年的函数与 OP(原始海报)写的完全相同。

Cython float32 使用 Jupyter %% 魔法。该功能可以直接在Python中使用。 Cython 需要一个用于编译 Python 的 C++ 编译器。安装正确版本的 Visual C++ 编译器(适用于 Windows)可能会出现问题:

%%cython

import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray

cdef extern from "math.h":
    np.float32_t exp(np.float32_t m)

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)

def cy_loop32(np.float32_t[:] Tm,np.float32_t[:] tau,int alen):
    cdef np.float32_t[:] T=np.empty(alen, dtype=np.float32)
    cdef int i
    T[0]=0.0
    for i in range(1,alen):
        T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
    return T

Cython float64 使用 Jupyter %% 魔法。函数可以直接在Python中使用:

%%cython

cdef extern from "math.h":
    double exp(double m)
import cython
import numpy as np
cimport numpy as np
from numpy cimport ndarray

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.infer_types(True)
@cython.initializedcheck(False)

def cy_loop(double[:] Tm,double[:] tau,int alen):
    cdef double[:] T=np.empty(alen)
    cdef int i
    T[0]=0.0
    for i in range(1,alen):
        T[i] = Tm[i] + (T[i-1] - Tm[i])**(-tau[i])
    return T

Numba float64:

@numba.jit("float64[:](float64[:], float64[:])", nopython=False, nogil=True)
def calc_py_jitv2(Tm_, tau_):
    tt = np.empty(len(Tm_),dtype=np.float64)
    tt[0] = Tm_[0]
    for i in range(1, len(Tm_)):
        tt[i] = Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i])
    return tt[1:]

附加到列表。最快的非编译解决方案:

def rec_py_loop(Tm,tau,alen):
     T = [Tm[0]]
     for i in range(1,alen):
        T.append(Tm[i] - (T[i-1] + Tm[i])**(-tau[i]))
     return np.array(T)

使用 a.item():

def rec_numpy_loop_item(Tm_,tau_):
    n_ = len(Tm_)
    tt=np.empty(n_)
    Ti=tt.item
    Tis=tt.itemset
    Tmi=Tm_.item
    taui=tau_.item
    Tis(0,Tm_[0])
    for i in range(1,n_):
        Tis(i,Tmi(i) - (Ti(i-1) + Tmi(i))**(-taui(i)))
    return tt[1:]

np.fromiter():

def it(Tm,tau):
    T=Tm[0]
    i=0
    while True:
        yield T
        i+=1
        T=Tm[i] - (T + Tm[i])**(-tau[i])

def rec_numpy_iter(Tm,tau,alen):
    return np.fromiter(it(Tm,tau), np.float64, alen)[1:]

循环遍历 Numpy(基于 Jaffe 的想法):

def rec_numpy_loop(Tm,tau,alen):
    tt=np.empty(alen)
    tt[0]=Tm[0]
    for i in range(1,alen):
        tt[i] = Tm[i] - (tt[i-1] + Tm[i])**(-tau[i])
    return tt[1:]

循环遍历 Numpy(Aronstef 的代码)。在我的计算机上,float64np.empty 的默认类型。

def calc_py(Tm_, tau_):
    tt = np.empty(len(Tm_),dtype="float64")
    tt[0] = Tm_[0]
    for i in range(1, len(Tm_)):
        tt[i] = (Tm_[i] - (tt[i-1] + Tm_[i])**(-tau_[i]))
    return tt[1:]

纯 C 完全不使用 Python。 2016 年的版本(带有 fabs() 函数):

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include <windows.h>
#include <sys\timeb.h> 

double randn() 
    double u = rand();
    if (u > 0.5) 
        return sqrt(-1.57079632679*log(1.0 - pow(2.0 * u - 1, 2)));
    
    else 
        return -sqrt(-1.57079632679*log(1.0 - pow(1 - 2.0 * u,2)));
    

void rec_pure_c(double *Tm, double *tau, int alen, double *T)


    for (int i = 1; i < alen; i++)
    
        T[i] = Tm[i] + pow(fabs(T[i - 1] - Tm[i]), (-tau[i]));
    


int main() 
    int N = 100000;
    double *Tm= calloc(N, sizeof *Tm);
    double *tau = calloc(N, sizeof *tau);
    double *T = calloc(N, sizeof *T);
    double time = 0;
    double sumtime = 0;
    for (int i = 0; i < N; i++)
    
        Tm[i] = randn();
        tau[i] = randn();
    

    LARGE_INTEGER StartingTime, EndingTime, ElapsedMicroseconds;
    LARGE_INTEGER Frequency;
    for (int j = 0; j < 1000; j++)
    
        for (int i = 0; i < 3; i++)
        
            QueryPerformanceFrequency(&Frequency);
            QueryPerformanceCounter(&StartingTime);

            rec_pure_c(Tm, tau, N, T);

            QueryPerformanceCounter(&EndingTime);
            ElapsedMicroseconds.QuadPart = EndingTime.QuadPart - StartingTime.QuadPart;
            ElapsedMicroseconds.QuadPart *= 1000000;
            ElapsedMicroseconds.QuadPart /= Frequency.QuadPart;
            if (i == 0)
                time = (double)ElapsedMicroseconds.QuadPart / 1000;
            else 
                if (time > (double)ElapsedMicroseconds.QuadPart / 1000)
                    time = (double)ElapsedMicroseconds.QuadPart / 1000;
            
        
        sumtime += time;
    
    printf("1000 loops,best of 3: %.3f ms per loop\n",sumtime/1000);

    free(Tm);
    free(tau);
    free(T);

Fortran f2py. 函数可以从Python 使用。 2016 年的版本(带有 abs() 函数):

subroutine rec_fortran(tm,tau,alen,result)
    integer*8, intent(in) :: alen
    real*8, dimension(alen), intent(in) :: tm
    real*8, dimension(alen), intent(in) :: tau
    real*8, dimension(alen) :: res
    real*8, dimension(alen), intent(out) :: result

    res(1)=0
    do i=2,alen
        res(i) = tm(i) + (abs(res(i-1) - tm(i)))**(-tau(i))
    end do
    result=res    
end subroutine rec_fortran

【讨论】:

是应该抛出的 numba 解决方案:“NumbaWarning:编译正在回退到启用循环提升的对象模式,因为函数“calc_py_jit32”类型推断失败......”? @Ziofil New Numba 破解了密码。我调整了代码,以便 numba 可以在 nopython 模式下编译。现在速度更快了。【参考方案4】:

更新:21-10-2018 我已经根据 cmets 更正了我的答案。

只要计算不是递归的,就可以对向量进行向量化操作。因为递归操作取决于先前的计算值,所以不可能并行处理该操作。 因此这不起作用:

def calc_vect(Tm_, tau_):
    return Tm_[1:] - (Tm_[:-1] + Tm_[1:]) ** (-tau_[1:])

由于(串行处理/循环)是必要的,因此通过尽可能接近优化的机器代码来获得最佳性能,因此 Numba 和 Cython 是这里的最佳答案。

Numba 方法可以实现如下:

init_string = """
from math import pow
import numpy as np
from numba import jit, float32

np.random.seed(0)
n = 100000
Tm = np.cumsum(np.random.uniform(0.1, 1, size=n).astype('float32'))
tau = np.random.uniform(-1, 0, size=n).astype('float32')

def calc_python(Tm_, tau_):
 tt = np.empty(len(Tm_))
 tt[0] = Tm_[0]
 for i in range(1, len(Tm_)):
     tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
 return tt

@jit(float32[:](float32[:], float32[:]), nopython=False, nogil=True)
def calc_numba(Tm_, tau_):
  tt = np.empty(len(Tm_))
  tt[0] = Tm_[0]
  for i in range(1, len(Tm_)):
      tt[i] = Tm_[i] - pow(tt[i-1] + Tm_[i], -tau_[i])
  return tt
"""

import timeit
py_time = timeit.timeit('calc_python(Tm, tau)', init_string, number=100)
numba_time = timeit.timeit('calc_numba(Tm, tau)', init_string, number=100)
print("Python Solution: ".format(py_time))
print("Numba Soltution: ".format(numba_time))

Python 和 Numba 函数的 Timeit 比较:

Python Solution: 54.58057559299999
Numba Soltution: 1.1389029540000024

【讨论】:

此示例存在一个问题,即出现nan 值,因为它使用real exponents with negative bases。使用Tm = np.cumsum(np.random.uniform(0, 1, size=n).astype('f')) 作为解决方法。 矢量化解决方案不起作用。我检查一下。根据我的基准,这里的 Numba 解决方案是推荐的解决方案。看看我的回答中的原因。【参考方案5】:

这是个好问题。我也很想知道这是否可行,但到目前为止,除了一些简单的情况外,我还没有找到一种方法。

选项 1。numpy.ufunc.accumulate

正如@Karl Knechtel 所提到的,这似乎是一个很有前途的选择。您需要先创建一个ufunc。 This web page 解释了如何。

在以两个标量作为输入并输出一个定标器的循环函数的简单情况下,它似乎可以工作:

import numpy as np

def test_add(x, data):
    return x + data

assert test_add(1, 2) == 3
assert test_add(2, 3) == 5

# Make a Numpy ufunc from my test_add function
test_add_ufunc = np.frompyfunc(test_add, 2, 1)

assert test_add_ufunc(1, 2) == 3
assert test_add_ufunc(2, 3) == 5
assert np.all(test_add_ufunc([1, 2], [2, 3]) == [3, 5])

data_sequence = np.array([1, 2, 3, 4])
f_out = test_add_ufunc.accumulate(data_sequence, dtype=object)
assert np.array_equal(f_out, [1, 3, 6, 10])

[请注意dtype=object 参数,如上面链接的网页中所述,这是必要的]。

但在你(和我)的情况下,我们想要计算一个循环方程,它有多个数据输入(也可能有多个状态变量)。

当我使用上面的ufunc.accumulate 方法尝试此操作时,我得到了ValueError: accumulate only supported for binary functions

如果有人知道绕过该限制的方法,我会非常感兴趣。

选项 2. Python 的内置 accumulate 函数

同时,这个解决方案在 numpy 中的矢量化计算方面并没有完全实现您想要的,但它至少避免了 for 循环。

from itertools import accumulate, chain


def t_next(t, data):
    Tm, tau = data  # Unpack more than one data input
    return Tm + (t - Tm)**tau

assert t_next(2, (0.38, 0)) == 1.38

t0 = 2  # Initial t
Tm_values = np.array([0.38, 0.88, 0.56, 0.67, 0.45, 0.98, 0.58, 0.72, 0.92, 0.82])
tau_values = np.linspace(0, 0.9, 10)

# Combine the input data into a 2D array
data_sequence = np.vstack([Tm_values, tau_values]).T
t_out = np.fromiter(accumulate(chain([t0], data_sequence), t_next), dtype=float)
print(t_out)
# [2.         1.38       1.81303299 1.60614649 1.65039964 1.52579703
#  1.71878078 1.66109554 1.67839293 1.72152195 1.73091672]

# Slightly more readable version possible in Python 3.8+
t_out = np.fromiter(accumulate(data_sequence, t_next, initial=t0), dtype=float)
print(t_out)
# [2.         1.38       1.81303299 1.60614649 1.65039964 1.52579703
#  1.71878078 1.66109554 1.67839293 1.72152195 1.73091672]

【讨论】:

以上是关于是否可以对 NumPy 数组的递归计算进行矢量化,其中每个元素都依赖于前一个元素?的主要内容,如果未能解决你的问题,请参考以下文章

是否可以在 Numpy 矢量化广播操作期间访问当前索引?

利用Python进行数据分析——Numpy基础:数组和矢量计算

假设有许多重复项,使用 numpy 对“纯”函数进行矢量化

Numpy基础:数组和矢量计算

python数据分析 Numpy基础 数组和矢量计算

python numpy基础 数组和矢量计算