pyspark 计算稀疏向量的距离矩阵

Posted

技术标签:

【中文标题】pyspark 计算稀疏向量的距离矩阵【英文标题】:pyspark calculate distance matrix of sparse vectors 【发布时间】:2017-08-08 12:32:30 【问题描述】:

我正在尝试构建一种通用方法来计算许多稀疏向量(长度为 250k 的 100k 向量)的距离矩阵。在我的示例中,数据以 scipy csr 矩阵表示。这就是我正在做的:

首先,我定义了一种将 csr 行转换为 pyspark SparseVectors 的方法:

def csr_to_sparse_vector(row):
    return SparseVector(row.shape[1], sorted(row.indices), row.data)

现在我将行转换为向量并将它们保存到一个列表中,然后将其提供给 SparkContext:

sparse_vectors = [csr_to_sparse_vector(row) for row in refs_sample]
rdd = sc.parallelize(sparse_vectors)

在下一步中,我使用笛卡尔函数来构建所有对(类似于这篇文章:Pyspark calculate custom distance between all vectors in a RDD)

在这个实验中,我想使用相应定义的 tje Jaccard Similarity:

def jacc_sim(pair):
    dot_product = pair[0].dot(pair[1])
    try:
        sim = dot_product / (pair[0].numNonzeros() + pair[1].numNonzeros())
    except ZeroDivisionError:
        return 0.0
    return sim

现在我应该只映射函数并收集结果:

distance_matrix = rdd2.map(lambda x: jacc_sim(x)).collect()

我在一个只有 100 个文档的小样本上运行此代码,一个本地机器和一个具有 180 个节点的集群。该任务需要很长时间并最终崩溃:https://pastebin.com/UwLUXvUZ

有什么建议可能是错的吗?

此外,如果距离度量是对称的 sim(x,y) == sim(y,x),我们只需要矩阵的上三角形。我发现一个帖子通过过滤解决了这个问题(Upper triangle of cartesian in spark for symmetric operations: `x*(x+1)//2` instead of `x**2`):

rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0] < x[1])

但这不适用于 SparseVectors 列表。

【问题讨论】:

【参考方案1】:

问题是配置错误导致我的数据分成 1000 个分区。解决方案只是明确告诉 spark 他应该创建多少个分区(例如 10 个):

rdd = sc.parallelize(sparse_vectors, 10)

此外,我通过枚举扩展了稀疏向量列表,这样我就可以过滤掉不属于上三角矩阵的对:

sparse_vectors = [(i, csr_to_sparse_vector(row)) for i, row in enumerate(authors)]
rdd = sc.parallelize(sparse_vectors, 10)
rdd2 = rdd.cartesian(rdd).filter(lambda x: x[0][0] < x[1][0])
rdd2.map(lambda x: jacc_sim(x)).filter(lambda x: x is not None).saveAsTextFile('hdfs:///user/username/similarities')

所属相似度函数如下:

def jacc_sim(pair):
    id_0 = pair[0][0]
    vec_0 = pair[0][1]
    id_1 = pair[1][0]
    vec_1 = pair[1][1]
    dot_product = vec_0.dot(vec_1)
    try:
        sim = dot_product / (vec_0.numNonzeros() + vec_1.numNonzeros())
        if sim > 0:
            return (id_0, id_1, sim)
    except ZeroDivisionError:
        pass
    return None

这对我来说效果很好,我希望其他人也会觉得它有用!

【讨论】:

【参考方案2】:

是列表有问题,还是 SparseVectors 构成了列表?一个想法是尝试将 SparseVectors 转换为 DenseVectors,这是我在此处找到的建议 (Convert Sparse Vector to Dense Vector in Pyspark)。计算结果没有什么不同,只是 Spark 的处理方式。

【讨论】:

Hej @MisterJT,感谢您抽出宝贵时间。我的 spark 配置有问题,导致崩溃。 @nadre,很高兴你找到它。配置是特定于 spark 库还是特定于您的机器。

以上是关于pyspark 计算稀疏向量的距离矩阵的主要内容,如果未能解决你的问题,请参考以下文章

哪个 SciPy 稀疏矩阵类最适合计算距离矩阵?

在 Pyspark ML 中的稀疏向量数据类型列上创建 Python 转换器

Pyspark 计算 RDD 中所有向量之间的自定义距离

在 cython 中快速访问稀疏矩阵:memoryview 与字典向量

pyspark:将稀疏局部矩阵转换为 RDD

在 groupby 操作 PySpark 中聚合列中的稀疏向量