如何对 numpy 数组进行 n-D 距离和最近邻计算
Posted
技术标签:
【中文标题】如何对 numpy 数组进行 n-D 距离和最近邻计算【英文标题】:How to do n-D distance and nearest neighbor calculations on numpy arrays 【发布时间】:2019-02-21 07:12:35 【问题描述】:此问题旨在成为规范的重复目标
给定两个形状(i, n)
和(j, n)
的数组X
和Y
,代表n
维坐标列表,
def test_data(n, i, j, r = 100):
X = np.random.rand(i, n) * r - r / 2
Y = np.random.rand(j, n) * r - r / 2
return X, Y
X, Y = test_data(3, 1000, 1000)
最快的查找方法是什么:
D
形状为(i,j)
的距离X
中的每个点与Y
中的每个点之间的距离
k
最近邻的索引k_i
和距离k_d
针对X
中的所有点Y
中的每个点
X
中每个点的索引r_i
、r_j
和距离r_d
在Y
中每个点r
的距离r
内
鉴于以下限制:
仅使用numpy
使用任何python
包
包括特殊情况:
Y
是 X
在所有情况下,distance 主要表示Euclidean distance,但请随意突出显示允许其他距离计算的方法。
【问题讨论】:
根据我对 numpy 的经验,使用内部广播的重载运算符、覆盖变量以及在一行中编写大部分计算(因此 GIL 将适用)将是最快的方法。例如,要测量从 vectorx
到 matrix Y
的距离,您应该使用 dists = np.sqrt(np.sum(np.square(Y-x),axis=1))
。如果您不需要实际距离,而只需要平方和,请放弃 np.sqrt
操作。此外,由于在此方法中您至少迭代一个轴,因此使用最小值 min(X.shape[0],Y.shape[0])
进行迭代。包:使用 sklearn 实现。
顺便说一句,您还有许多已实现的方法,例如np.argsort
等,对于您的实现而言,这些方法将变得很少。如果您迭代样本,您可能需要在并行计算样本和矩阵之间的距离上投入一点,使用 multiprocessing
本机包。
【参考方案1】:
#1。所有距离
仅使用numpy
天真的方法是:
D = np.sqrt(np.sum((X[:, None, :] - Y[None, :, :])**2, axis = -1))
但是这会占用大量内存来创建(i, j, n)
形的中间矩阵,而且非常慢
但是,感谢@Divakar(eucl_dist
包,wiki)的一个技巧,我们可以使用一些代数和np.einsum
来分解:(X - Y)**2 = X**2 - 2*X*Y + Y**2
D = np.sqrt( # (X - Y) ** 2
np.einsum('ij, ij ->i', X, X)[:, None] + # = X ** 2 \
np.einsum('ij, ij ->i', Y, Y) - # + Y ** 2 \
2 * X.dot(Y.T)) # - 2 * X * Y
Y
是X
同上:
XX = np.einsum('ij, ij ->i', X, X)
D = np.sqrt(XX[:, None] + XX - 2 * X.dot(X.T))
请注意,使用这种方法,浮点不精确会使对角线项偏离零非常轻微。如果您需要确保它们为零,则需要显式设置它:
np.einsum('ii->i', D)[:] = 0
任何包
scipy.spatial.distance.cdist
是最直观的内置函数,比裸numpy
快得多
from scipy.spatial.distance import cdist
D = cdist(X, Y)
cdist
还可以处理许多距离度量以及用户定义的距离度量(尽管这些都没有优化)。有关详细信息,请查看上面链接的文档。
Y
是X
对于自指距离,scipy.spatial.distance.pdist
的工作方式与cdist
类似,但返回一维压缩距离数组,通过仅将每个项设置一次来节省对称距离矩阵的空间。您可以使用squareform
将其转换为方阵
from scipy.spatial.distance import pdist, squareform
D_cond = pdist(X)
D = squareform(D_cond)
#2。 K 最近邻 (KNN)
仅使用numpy
我们可以使用np.argpartition
来获取k-nearest
索引并使用它们来获取相应的距离值。因此,使用D
作为包含上面获得的距离值的数组,我们将拥有 -
if k == 1:
k_i = D.argmin(0)
else:
k_i = D.argpartition(k, axis = 0)[:k]
k_d = np.take_along_axis(D, k_i, axis = 0)
但是,我们可以通过在减少数据集之前不取平方根来加快速度。 np.sqrt
是计算欧几里得范数最慢的部分,所以我们不想直到最后才这样做。
D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)
if k == 1:
k_i = D_sq.argmin(0)
else:
k_i = D_sq.argpartition(k, axis = 0)[:k]
k_d = np.sqrt(np.take_along_axis(D_sq, k_i, axis = 0))
现在,np.argpartition
执行间接分区,并不一定给我们排序顺序的元素,只确保第一个 k
元素是最小的。因此,对于排序输出,我们需要在上一步的输出上使用argsort
-
sorted_idx = k_d.argsort(axis = 0)
k_i_sorted = np.take_along_axis(k_i, sorted_idx, axis = 0)
k_d_sorted = np.take_along_axis(k_d, sorted_idx, axis = 0)
如果你只需要k_i
,你根本不需要平方根:
D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)
if k == 1:
k_i = D_sq.argmin(0)
else:
k_i = D_sq.argpartition(k, axis = 0)[:k]
k_d_sq = np.take_along_axis(D_sq, k_i, axis = 0)
sorted_idx = k_d_sq.argsort(axis = 0)
k_i_sorted = np.take_along_axis(k_i, sorted_idx, axis = 0)
X
是Y
在上面的代码中,替换:
D_sq = np.einsum('ij, ij ->i', X, X)[:, None] +\
np.einsum('ij, ij ->i', Y, Y) - 2 * X.dot(Y.T)
与:
XX = np.einsum('ij, ij ->i', X, X)
D_sq = XX[:, None] + XX - 2 * X.dot(X.T))
任何包
KD-Tree 是一种更快的方法来查找邻居和限制距离。请注意,虽然 KDTree 通常比上述 3d 的蛮力解决方案快得多(只要 oyu 有超过 8 个点),如果你有 n
-dimensions,KDTree 只有在你有超过 2**n
点时才能很好地扩展.有关高维的讨论和更高级的方法,请参阅Here
最推荐的实现KDTree的方法是使用scipy
的scipy.spatial.KDTree
或scipy.spatial.cKDTree
from scipy.spatial import KDTree
X_tree = KDTree(X)
k_d, k_i = X_tree.query(Y, k = k)
不幸的是,scipy
的 KDTree 实现速度很慢,并且对于较大的数据集容易出现段错误。正如@HansMusgrave here 所指出的,pykdtree
大大提高了性能,但不像scipy
那样常见,并且目前只能处理欧几里得距离(而scipy
中的KDTree
可以处理任意阶的 Minkowsi p-范数)
X
是Y
改用:
k_d, k_i = X_tree.query(X, k = k)
任意指标
BallTree 具有与 KDTree 相似的算法属性。我不知道 Python 中的并行/矢量化/快速 BallTree,但是使用 scipy 我们仍然可以对用户定义的指标进行合理的 KNN 查询。如果可用,内置指标会更快。
def d(a, b):
return max(np.abs(a-b))
tree = sklearn.neighbors.BallTree(X, metric=d)
k_d, k_i = tree.query(Y)
如果d()
不是metric,则此答案将是错误的。 BallTree 比蛮力更快的唯一原因是因为度量的属性允许它排除某些解决方案。对于真正的任意函数,蛮力实际上是必要的。
#3。半径搜索
仅使用numpy
最简单的方法就是使用布尔索引:
mask = D_sq < r**2
r_i, r_j = np.where(mask)
r_d = np.sqrt(D_sq[mask])
任何包
同上,可以使用scipy.spatial.KDTree.query_ball_point
r_ij = X_tree.query_ball_point(Y, r = r)
或scipy.spatial.KDTree.query_ball_tree
Y_tree = KDTree(Y)
r_ij = X_tree.query_ball_tree(Y_tree, r = r)
不幸的是,r_ij
最终成为一个索引数组列表,这些索引数组有点难以解开以供以后使用。
更简单的是使用cKDTree
的sparse_distance_matrix
,它可以输出coo_matrix
from scipy.spatial import cKDTree
X_cTree = cKDTree(X)
Y_cTree = cKDTree(Y)
D_coo = X_cTree.sparse_distance_matrix(Y_cTree, r = r, output_type = `coo_matrix`)
r_i = D_coo.row
r_j = D_coo.column
r_d = D_coo.data
这是距离矩阵非常灵活的格式,因为它仍然是一个实际矩阵(如果转换为csr
)也可以用于许多矢量化操作。
【讨论】:
值得一提的是,既然平方根是单调的,我们可以直接使用einsum,而不必理会np.sqrt
?
我把它放在#2下,一旦我们开始减少我们想要返回的距离数量。对于第 1 部分,我们需要所有距离,因此我们需要所有的平方根。
使用np.einsum
计算D_sq
是否比使用pdist(X, 'sqeuclidean')
更快?
@Joe, D_sq
用于numpy
-only 计算(许多第 3 方脚本 API 实现 numpy
,但不是其他包)。 pdist
是一个 scipy
函数以上是关于如何对 numpy 数组进行 n-D 距离和最近邻计算的主要内容,如果未能解决你的问题,请参考以下文章
将具有 n 级分层索引的 Pandas DataFrame 转换为 n-D Numpy 数组