使用ray进行最近邻搜索的并行化
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用ray进行最近邻搜索的并行化相关的知识,希望对你有一定的参考价值。
假设我有一个大的稀疏矩阵。我想获取该矩阵的每个行向量,并计算到矩阵的先前window_size行中与其最近的邻居的余弦距离。
由于sklearn.neighbors在使用并行化时无法改善运行时间(请参见this issue on github,因此我尝试使用ray并行化该过程。我的代码在多处理方面比sklearn更好,但仍然比仅串行距离计算要慢。
我的代码在下面。我做错了什么,应该改进吗?
import scipy.sparse
from sklearn.metrics.pairwise import cosine_distances
from sklearn.neighbors import NearestNeighbors
import numpy as np
import timeit
import ray
import math
n = 4000
m = 100
big_matrix = scipy.sparse.random(n, m).tocsr()
window_size = 1000
retry = 1
n_jobs = 4
ray.init(num_cpus=n_jobs)
def simple_cosine_distance():
distances = np.zeros(n)
for i in range(1, n):
past = big_matrix[max(0, i - window_size):i]
query = big_matrix[i]
distances[i] = cosine_distances(query, past).min(axis=1)
def sklearn_nneighbor():
distances = np.zeros(n)
for i in range(1, n):
past = big_matrix[max(0, i - window_size):i]
nn = NearestNeighbors(metric="cosine", n_neighbors=1, n_jobs=1)
nn.fit(X=past)
query = big_matrix[i]
distance, _ = nn.kneighbors(query)
distances[i] = distance[0]
def sklearn_nneighbor_parallel():
distances = np.zeros(n)
for i in range(1, n):
past = big_matrix[max(0, i - window_size):i]
nn = NearestNeighbors(metric="cosine", n_neighbors=1, n_jobs=n_jobs)
nn.fit(X=past)
query = big_matrix[i]
distance, _ = nn.kneighbors(query)
distances[i] = distance[0]
@ray.remote
def get_chunk_min(data, indices, indptr, shape, slice_i, slice_j, query):
matrix = scipy.sparse.csr_matrix((data, indices, indptr), shape=shape)
past = matrix[slice_i:slice_j]
query = matrix[query]
chunk_min = cosine_distances(query, past).min(axis=1)
return chunk_min
def ray_parallel():
distances = np.zeros(n)
data = ray.put(big_matrix.data)
indices = ray.put(big_matrix.indices)
indptr = ray.put(big_matrix.indptr)
shape = ray.put(big_matrix.shape)
for i in range(1, n):
chunk_size = math.ceil((i - max(0, i - window_size)) / n_jobs)
chunk_mins = ray.get([
get_chunk_min.remote(
data,
indices,
indptr,
shape,
enum,
enum + chunk_size,
i
) for enum in range(max(0, i - window_size), i, chunk_size)])
distances[i] = min(chunk_mins)
for method in ["simple_cosine_distance", "sklearn_nneighbor", "sklearn_nneighbor_parallel", "ray_parallel"]:
print(method)
print(timeit.timeit(method + "()", setup="from __main__ import " + method, number=retry))
print("*"*50)
输出:
simple_cosine_distance3.978868665999471
sklearn_nneighbor4.265772191996803
sklearn_nneighbor_parallel28.664759318002325
ray_parallel17.89882287799992
答案
还要注意,如果您的远程功能足够简单,则分布式计算的开销(例如,调度时间,序列化时间等)可能会大于函数本身的计算时间。 以上是关于使用ray进行最近邻搜索的并行化的主要内容,如果未能解决你的问题,请参考以下文章