将函数应用于 ndarray 的每一行

Posted

技术标签:

【中文标题】将函数应用于 ndarray 的每一行【英文标题】:Apply a function to each row of a ndarray 【发布时间】:2014-04-30 04:16:39 【问题描述】:

我有这个函数来计算向量 x 的平方马氏距离的平均值:

def mahalanobis_sqdist(x, mean, Sigma):
   '''
    Calculates squared Mahalanobis Distance of vector x 
    to distibutions' mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

我有一个形状为(25, 4) 的numpy 数组。所以,我想将该函数应用于我的数组的所有 25 行,而不使用 for 循环。所以,基本上,我怎样才能写出这个循环的向量化形式:

for r in d1:
    mahalanobis_sqdist(r[0:4], mean1, Sig1)

mean1Sig1 是:

>>> mean1
array([ 5.028,  3.48 ,  1.46 ,  0.248])
>>> Sig1 = np.cov(d1[0:25, 0:4].T)
>>> Sig1
array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
       [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
       [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
       [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])

我尝试了以下方法,但没有成功:

>>> vecdist = np.vectorize(mahalanobis_sqdist)
>>> vecdist(d1, mean1, Sig1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/dist-packages/numpy/lib/function_base.py", line 1862, in __call__
    theout = self.thefunc(*newargs)
  File "<stdin>", line 6, in mahalanobis_sqdist
  File "/usr/lib/python2.7/dist-packages/numpy/linalg/linalg.py", line 445, in inv
    return wrap(solve(a, identity(a.shape[0], dtype=a.dtype)))
IndexError: tuple index out of range

【问题讨论】:

scipy.spatial.distance 模块也可以为您完成所有这些工作。那么代码将是例如cdist(d1, mean1[None], 'mahalanobis')**2 如果mean1 不是点的实际平均值,则应分别计算协方差和逆并执行cdist(d1, mean1[None], 'mahalanobis', VI=Sigma_inv)**2 【参考方案1】:

要将函数应用于数组的每一行,您可以使用:

np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)    

但是,在这种情况下,有一个更好的方法。您不必对每一行应用函数。相反,您可以将 NumPy 操作应用于整个 d1 数组以计算相同的结果。 np.einsum 可以替换for-loop 和对np.dot 的两个调用:


def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

以下是一些基准:

import numpy as np
np.random.seed(1)

def mahalanobis_sqdist(x, mean, Sigma):
   '''
   Calculates squared Mahalanobis Distance of vector x 
   to distibutions mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

def using_loop(d1, mean, Sigma):
    expected = []
    for r in d1:
        expected.append(mahalanobis_sqdist(r[0:4], mean1, Sig1))
    return np.array(expected)

d1 = np.random.random((25,4))
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
Sig1 = np.cov(d1[0:25, 0:4].T)

expected = using_loop(d1, mean1, Sig1)
result = np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
result2 = mahalanobis_sqdist2(d1, mean1, Sig1)
assert np.allclose(expected, result)
assert np.allclose(expected, result2)

In [92]: %timeit mahalanobis_sqdist2(d1, mean1, Sig1)
10000 loops, best of 3: 31.1 µs per loop
In [94]: %timeit using_loop(d1, mean1, Sig1)
1000 loops, best of 3: 569 µs per loop
In [91]: %timeit np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
1000 loops, best of 3: 806 µs per loop

因此,mahalanobis_sqdist2for-loop 快大约 18 倍,比使用 np.apply_along_axis 快 26 倍。


请注意,np.apply_along_axisnp.vectorizenp.frompyfunc 是 Python 实用程序函数。他们在后台使用for-while-loops。这里没有真正的“矢量化”。它们可以提供语法帮助,但不要指望它们使您的代码性能比您自己编写的 for-loop 更好。

【讨论】:

我试过这个 np.apply_along_axis(mahalanobis_sqdist, axis=1, arr=d1, args=(mean1, Sig1)) 但我得到了以下错误: Traceback (最近一次通话最后一次): File " ",第 1 行,在 TypeError: apply_along_axis() got an unexpected keyword argument 'args' 我的错误。 args 不是关键字参数。我已经在上面更正了。 Plusone for "请注意,np.apply_along_axis、np.vectorize、np.frompyfunc 是 Python 实用程序函数。在引擎盖下,它们使用 for 或 while 循环。没有真正的“矢量化”进行这里。” - 我不知道这个,很高兴知道,谢谢!【参考方案2】:

@unutbu 的答案非常适合将任何函数应用于数组的行。 在这种特殊情况下,您可以使用一些数学对称性,如果您使用大型数组,它们将大大加快处理速度。

这是你的函数的修改版本:

def mahalanobis_sqdist3(x, mean, Sigma):
    Sigma_inv = np.linalg.inv(Sigma)
    xdiff = x - mean
    return (xdiff.dot(Sigma_inv)*xdiff).sum(axis=-1)

如果您最终使用任何类型的大 Sigma,我建议您缓存 Sigma_inv 并将其作为参数传递给您的函数。 由于在此示例中是 4x4,因此这无关紧要。 我将向遇到此问题的其他人展示如何处理大的Sigma

如果您不打算重复使用相同的Sigma,您将无法缓存它,因此,您可以使用不同的方法来求解线性系统,而不是反转矩阵。 在这里,我将使用 SciPy 中内置的 LU 分解。 仅当x 的列数相对于其行数较大时,这才会缩短时间。

这是一个展示这种方法的函数:

from scipy.linalg import lu_factor, lu_solve
def mahalanobis_sqdist4(x, mean, Sigma):
    xdiff = x - mean
    Sigma_inv = lu_factor(Sigma)
    return (xdiff.T*lu_solve(Sigma_inv, xdiff.T)).sum(axis=0)

这里有一些时间。 如另一个答案中所述,我将包含带有einsum 的版本。

import numpy as np
Sig1 = np.array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
                 [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
                 [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
                 [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
x = np.random.rand(25, 4)
%timeit np.apply_along_axis(mahalanobis_sqdist, 1, x, mean1, Sig1)
%timeit mahalanobis_sqdist2(x, mean1, Sig1)
%timeit mahalanobis_sqdist3(x, mean1, Sig1)
%timeit mahalanobis_sqdist4(x, mean1, Sig1)

给予:

1000 loops, best of 3: 973 µs per loop
10000 loops, best of 3: 36.2 µs per loop
10000 loops, best of 3: 40.8 µs per loop
10000 loops, best of 3: 83.2 µs per loop

但是,更改所涉及的数组的大小会更改计时结果。 比如让x = np.random.rand(2500, 4),时间是:

10 loops, best of 3: 95 ms per loop
1000 loops, best of 3: 355 µs per loop
10000 loops, best of 3: 131 µs per loop
1000 loops, best of 3: 337 µs per loop

而让x = np.random.rand(1000, 1000)Sigma1 = np.random.rand(1000, 1000)mean1 = np.random.rand(1000),时间是:

1 loops, best of 3: 1min 24s per loop
1 loops, best of 3: 2.39 s per loop
10 loops, best of 3: 155 ms per loop
10 loops, best of 3: 99.9 ms per loop

编辑:我注意到其他答案之一使用了 Cholesky 分解。 鉴于Sigma 是对称且正定的,我们实际上可以做得比我上面的结果更好。 有一些来自 BLAS 和 LAPACK 的好例程可通过 SciPy 获得,它们可以处理对称正定矩阵。 这里有两个更快的版本。

from scipy.linalg.fblas import dsymm
def mahalanobis_sqdist5(x, mean, Sigma_inv):
    xdiff = x - mean
    Sigma_inv = la.inv(Sigma)
    return np.einsum('...i,...i->...',dsymm(1., Sigma_inv, xdiff.T).T, xdiff)
from scipy.linalg.flapack import dposv
def mahalanobis_sqdist6(x, mean, Sigma):
    xdiff = x - mean
    return np.einsum('...i,...i->...', xdiff, dposv(Sigma, xdiff.T)[1].T)

第一个仍然反转 Sigma。 如果您预先计算逆并重用它,它会更快(1000x1000 的情况在我的机器上使用预先计算的逆需要 35.6 毫秒)。 我还使用 einsum 取产品,然后沿最后一个轴求和。 这最终比执行(A * B).sum(axis=-1) 之类的操作要快一些。 这两个函数给出了以下时序:

第一个测试用例:

10000 loops, best of 3: 55.3 µs per loop
100000 loops, best of 3: 14.2 µs per loop

第二个测试用例:

10000 loops, best of 3: 121 µs per loop
10000 loops, best of 3: 79 µs per loop

第三个测试用例:

10 loops, best of 3: 92.5 ms per loop
10 loops, best of 3: 48.2 ms per loop

【讨论】:

非常好!我不需要为每个数据点计算 Sigma_inv!真的很有趣的讨论!我喜欢它【参考方案3】:

刚刚在reddit 上看到一个非常好的评论,这可能会加快速度:

对于经常使用 numpy 的人来说,这并不奇怪。对于循环 在 python 中非常慢。实际上,einsum 也很慢。 如果您有很多向量(500 4维向量足以使这个版本比 einsum 在我的机器上):

def no_einsum(d, mean, Sigma):
    L_inv = np.linalg.inv(numpy.linalg.cholesky(Sigma))
    xdiff = d - mean
    return np.sum(np.dot(xdiff, L_inv.T)**2, axis=1)

如果你的点也是高维的,那么计算倒数是 慢(无论如何通常是个坏主意),您可以通过以下方式节省时间 直接求解系统(250 维 500 个向量就足够了 使这个版本在我的机器上最快):

def no_einsum_solve(d, mean, Sigma):
    L = numpy.linalg.cholesky(Sigma)
    xdiff = d - mean
    return np.sum(np.linalg.solve(L, xdiff.T)**2, axis=0)

【讨论】:

Sigma 是对称正定的。我错过了。这使得 Cholesky 分解成为一个可行的选择。【参考方案4】:

问题是np.vectorize 对所有参数进行矢量化,但您只需要对第一个参数进行矢量化。您需要使用excluded 关键字参数到vectorize

np.vectorize(mahalanobis_sqdist, excluded=[1, 2])

【讨论】:

以上是关于将函数应用于 ndarray 的每一行的主要内容,如果未能解决你的问题,请参考以下文章

如何将函数应用于 MATLAB 中矩阵的每一行/列?

将函数应用于pandas Python中的每一行时出现数据转换错误

将数据帧返回函数应用于基础数据帧的每一行

将函数应用于 data.frame 中的每一行并将结果附加到 R 中的 data.frame

将标量函数应用于每一行

将邮政编码 API 调用应用于数据框中的每一行