如何对 numpy 数组进行 n-D 距离和最近邻计算

Posted

技术标签:

【中文标题】如何对 numpy 数组进行 n-D 距离和最近邻计算【英文标题】:How to do n-D distance and nearest neighbor calculations on numpy arrays 【发布时间】:2019-02-21 07:12:35 【问题描述】:

此问题旨在成为规范的重复目标

给定两个形状(i, n)(j, n)的数组XY,代表n维坐标列表,

def test_data(n, i, j, r = 100):
    X = np.random.rand(i, n) * r - r / 2
    Y = np.random.rand(j, n) * r - r / 2
    return X, Y

X, Y = test_data(3, 1000, 1000)

最快的查找方法是什么:

    D 形状为(i,j) 的距离X 中的每个点与Y 中的每个点之间的距离 k 最近邻的索引k_i 和距离k_d 针对X 中的所有点Y 中的每个点 X 中每个点的索引r_ir_j 和距离r_dY 中每个点r 的距离r

鉴于以下限制:

仅使用numpy 使用任何python

包括特殊情况:

YX

在所有情况下,distance 主要表示Euclidean distance,但请随意突出显示允许其他距离计算的方法。

【问题讨论】:

根据我对 numpy 的经验,使用内部广播的重载运算符、覆盖变量以及在一行中编写大部分计算(因此 GIL 将适用)将是最快的方法。例如,要测量从 vector xmatrix Y 的距离,您应该使用 dists = np.sqrt(np.sum(np.square(Y-x),axis=1))。如果您不需要实际距离,而只需要平方和,请放弃 np.sqrt 操作。此外,由于在此方法中您至少迭代一个轴,因此使用最小值 min(X.shape[0],Y.shape[0]) 进行迭代。包:使用 sklearn 实现。 顺便说一句,您还有许多已实现的方法,例如np.argsort 等,对于您的实现而言,这些方法将变得很少。如果您迭代样本,您可能需要在并行计算样本和矩阵之间的距离上投入一点,使用 multiprocessing 本机包。 【参考方案1】:

#1。所有距离

仅使用numpy

天真的方法是:

D = np.sqrt(np.sum((X[:, None, :] - Y[None, :, :])**2, axis = -1))

但是这会占用大量内存来创建(i, j, n) 形的中间矩阵,而且非常慢

但是,感谢@Divakar(eucl_dist 包,wiki)的一个技巧,我们可以使用一些代数和np.einsum 来分解:(X - Y)**2 = X**2 - 2*X*Y + Y**2

D = np.sqrt(                                #  (X - Y) ** 2   
np.einsum('ij, ij ->i', X, X)[:, None] +    # = X ** 2        \
np.einsum('ij, ij ->i', Y, Y)          -    # + Y ** 2        \
2 * X.dot(Y.T))                             # - 2 * X * Y
YX

同上:

XX = np.einsum('ij, ij ->i', X, X)
D = np.sqrt(XX[:, None] + XX - 2 * X.dot(X.T))

请注意,使用这种方法,浮点不精确会使对角线项偏离零非常轻微。如果您需要确保它们为零,则需要显式设置它:

np.einsum('ii->i', D)[:] = 0 
任何包

scipy.spatial.distance.cdist 是最直观的内置函数,比裸numpy 快得多

from scipy.spatial.distance import cdist
D = cdist(X, Y)

cdist 还可以处理许多距离度量以及用户定义的距离度量(尽管这些都没有优化)。有关详细信息,请查看上面链接的文档。

YX

对于自指距离,scipy.spatial.distance.pdist 的工作方式与cdist 类似,但返回一维压缩距离数组,通过仅将每个项设置一次来节省对称距离矩阵的空间。您可以使用squareform 将其转换为方阵

from scipy.spatial.distance import pdist, squareform
D_cond = pdist(X)
D = squareform(D_cond)

#2。 K 最近邻 (KNN)

仅使用numpy

我们可以使用np.argpartition 来获取k-nearest 索引并使用它们来获取相应的距离值。因此,使用D 作为包含上面获得的距离值的数组,我们将拥有 -

if k == 1:
    k_i = D.argmin(0)
else:
    k_i = D.argpartition(k, axis = 0)[:k]
k_d = np.take_along_axis(D, k_i, axis = 0)

但是,我们可以通过在减少数据集之前不取平方根来加快速度。 np.sqrt 是计算欧几里得范数最慢的部分,所以我们不想直到最后才这样做。

D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
       np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)
if k == 1:
    k_i = D_sq.argmin(0)
else:
    k_i = D_sq.argpartition(k, axis = 0)[:k]
k_d = np.sqrt(np.take_along_axis(D_sq, k_i, axis = 0))

现在,np.argpartition 执行间接分区,并不一定给我们排序顺序的元素,只确保第一个 k 元素是最小的。因此,对于排序输出,我们需要在上一步的输出上使用argsort -

sorted_idx = k_d.argsort(axis = 0)
k_i_sorted = np.take_along_axis(k_i, sorted_idx, axis = 0)
k_d_sorted = np.take_along_axis(k_d, sorted_idx, axis = 0)

如果你只需要k_i,你根本不需要平方根:

D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
       np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)
if k == 1:
    k_i = D_sq.argmin(0)
else:
    k_i = D_sq.argpartition(k, axis = 0)[:k]
k_d_sq = np.take_along_axis(D_sq, k_i, axis = 0)
sorted_idx = k_d_sq.argsort(axis = 0)
k_i_sorted = np.take_along_axis(k_i, sorted_idx, axis = 0)
XY

在上面的代码中,替换:

D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
       np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)

与:

XX = np.einsum('ij, ij ->i', X, X)
D_sq = XX[:, None] + XX - 2 * X.dot(X.T))
任何包

KD-Tree 是一种更快的方法来查找邻居和限制距离。请注意,虽然 KDTree 通常比上述 3d 的蛮力解决方案快得多(只要 oyu 有超过 8 个点),如果你有 n-dimensions,KDTree 只有在你有超过 2**n 点时才能很好地扩展.有关高维的讨论和更高级的方法,请参阅Here

最推荐的实现KDTree的方法是使用scipyscipy.spatial.KDTreescipy.spatial.cKDTree

from scipy.spatial import KDTree
X_tree = KDTree(X)
k_d, k_i = X_tree.query(Y, k = k)

不幸的是,scipy 的 KDTree 实现速度很慢,并且对于较大的数据集容易出现段错误。正如@HansMusgrave here 所指出的,pykdtree 大大提高了性能,但不像scipy 那样常见,并且目前只能处理欧几里得距离(而scipy 中的KDTree 可以处理任意阶的 Minkowsi p-范数)

XY

改用:

k_d, k_i = X_tree.query(X, k = k)
任意指标

BallTree 具有与 KDTree 相似的算法属性。我不知道 Python 中的并行/矢量化/快速 BallTree,但是使用 scipy 我们仍然可以对用户定义的指标进行合理的 KNN 查询。如果可用,内置指标会更快。

def d(a, b):
    return max(np.abs(a-b))

tree = sklearn.neighbors.BallTree(X, metric=d)
k_d, k_i = tree.query(Y)

如果d() 不是metric,则此答案将是错误的。 BallTree 比蛮力更快的唯一原因是因为度量的属性允许它排除某些解决方案。对于真正的任意函数,蛮力实际上是必要的。

#3。半径搜索

仅使用numpy

最简单的方法就是使用布尔索引:

mask = D_sq < r**2
r_i, r_j = np.where(mask)
r_d = np.sqrt(D_sq[mask])
任何包

同上,可以使用scipy.spatial.KDTree.query_ball_point

r_ij = X_tree.query_ball_point(Y, r = r)

scipy.spatial.KDTree.query_ball_tree

Y_tree = KDTree(Y)
r_ij = X_tree.query_ball_tree(Y_tree, r = r)

不幸的是,r_ij 最终成为一个索引数组列表,这些索引数组有点难以解开以供以后使用。

更简单的是使用cKDTreesparse_distance_matrix,它可以输出coo_matrix

from scipy.spatial import cKDTree
X_cTree = cKDTree(X)
Y_cTree = cKDTree(Y)
D_coo = X_cTree.sparse_distance_matrix(Y_cTree, r = r, output_type = `coo_matrix`)
r_i = D_coo.row
r_j = D_coo.column
r_d = D_coo.data

这是距离矩阵非常灵活的格式,因为它仍然是一个实际矩阵(如果转换为csr)也可以用于许多矢量化操作。

【讨论】:

值得一提的是,既然平方根是单调的,我们可以直接使用einsum,而不必理会np.sqrt 我把它放在#2下,一旦我们开始减少我们想要返回的距离数量。对于第 1 部分,我们需要所有距离,因此我们需要所有的平方根。 使用np.einsum 计算D_sq 是否比使用pdist(X, 'sqeuclidean') 更快? @Joe, D_sq 用于numpy-only 计算(许多第 3 方脚本 API 实现 numpy,但不是其他包)。 pdist 是一个 scipy 函数

以上是关于如何对 numpy 数组进行 n-D 距离和最近邻计算的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Numpy 中“压缩”几个 N-D 数组?

将具有 n 级分层索引的 Pandas DataFrame 转换为 n-D Numpy 数组

基于欧几里得距离的 1-最近邻分类器如何对观察进行分类

根据相互距离对 2D/3D 点数组进行排序的启发式方法

『cs231n』作业1问题1选讲_通过代码理解K近邻算法&交叉验证选择超参数参数

K-近邻算法(K-NN)