仅影响零值的两个稀疏矩阵的点积
Posted
技术标签:
【中文标题】仅影响零值的两个稀疏矩阵的点积【英文标题】:Dot product of two sparse matrices affecting zero values only 【发布时间】:2016-11-20 00:44:50 【问题描述】:我正在尝试计算一个简单的点积,但保留原始矩阵中的非零值不变。一个玩具例子:
import numpy as np
A = np.array([[2, 1, 1, 2],
[0, 2, 1, 0],
[1, 0, 1, 1],
[2, 2, 1, 0]])
B = np.array([[ 0.54331039, 0.41018682, 0.1582158 , 0.3486124 ],
[ 0.68804647, 0.29520239, 0.40654206, 0.20473451],
[ 0.69857579, 0.38958572, 0.30361365, 0.32256483],
[ 0.46195299, 0.79863505, 0.22431876, 0.59054473]])
期望的结果:
C = np.array([[ 2. , 1. , 1. , 2. ],
[ 2.07466874, 2. , 1. , 0.73203386],
[ 1. , 1.5984076 , 1. , 1. ],
[ 2. , 2. , 1. , 1.42925865]])
然而,实际的矩阵是稀疏的,看起来更像这样:
A = sparse.rand(250000, 1700, density=0.001, format='csr')
B = sparse.rand(1700, 1700, density=0.02, format='csr')
一种简单的方法是使用掩码索引设置值,如下所示:
mask = A != 0
C = A.dot(B)
C[mask] = A[mask]
但是,我的原始数组非常稀疏且非常大,因此通过索引分配更改它们非常缓慢。转换为 lil 矩阵有点帮助,但同样,转换本身需要很多时间。
我猜,另一种明显的方法是诉诸迭代并跳过掩码值,但我不想放弃 numpy/scipy 优化的数组乘法的好处。
一些澄清:我实际上对某种特殊情况感兴趣,其中B
总是正方形,因此A
和C
具有相同的形状。因此,如果有一个解决方案不适用于任意数组但适合我的情况,那很好。
更新:一些尝试:
from scipy import sparse
import numpy as np
def naive(A, B):
mask = A != 0
out = A.dot(B).tolil()
out[mask] = A[mask]
return out.tocsr()
def proposed(A, B):
Az = A == 0
R, C = np.where(Az)
out = A.copy()
out[Az] = np.einsum('ij,ji->i', A[R], B[:, C])
return out
%timeit naive(A, B)
1 loops, best of 3: 4.04 s per loop
%timeit proposed(A, B)
/usr/local/lib/python2.7/dist-packages/scipy/sparse/compressed.py:215: SparseEfficiencyWarning: Comparing a sparse matrix with 0 using == is inefficient, try using != instead.
/usr/local/lib/python2.7/dist-packages/scipy/sparse/coo.pyc in __init__(self, arg1, shape, dtype, copy)
173 self.shape = M.shape
174
--> 175 self.row, self.col = M.nonzero()
176 self.data = M[self.row, self.col]
177 self.has_canonical_format = True
MemoryError:
另一个更新:
Cython 不能使任何东西或多或少有用,至少在离 Python 太远的情况下。我们的想法是将点积留给 scipy,并尝试尽可能快地设置这些原始值,如下所示:
cimport cython
@cython.cdivision(True)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef coo_replace(int [:] row1, int [:] col1, float [:] data1, int[:] row2, int[:] col2, float[:] data2):
cdef int N = row1.shape[0]
cdef int M = row2.shape[0]
cdef int i, j
cdef dict d =
for i in range(M):
d[(row2[i], col2[i])] = data2[i]
for j in range(N):
if (row1[j], col1[j]) in d:
data1[j] = d[(row1[j], col1[j])]
这比我之前的第一个“幼稚”实现(使用.tolil()
)要好一些,但是按照 hpaulj 的方法,可以丢弃 lil。也许用 std::map 之类的东西替换 python dict 会有所帮助。
【问题讨论】:
mask
的形状为A
,而C
的形状不一定与A
的形状相同。所以,C[mask]
没有多大意义,因此我想我们需要更多关于你的意思的细节zero values
。
哦,当然。忘了提到在我的情况下B
是方形的,co A.dot(B).shape == A.shape
。然而,A
本身可能不是方形的。但你是对的,这不适用于任意数组。
零值是矩阵A
的值等于零。 “原始”矩阵是指A
,是的。
B
也是稀疏的吗?
是的。 (试图做出 15 个符号的有意义的评论)
【参考方案1】:
Python 不是我的主要语言,但我认为这是一个有趣的问题,我想尝试一下 :)
预赛:
import numpy
import scipy.sparse
# example matrices and sparse versions
A = numpy.array([[1, 2, 0, 1], [1, 0, 1, 2], [0, 1, 2 ,1], [1, 2, 1, 0]])
B = numpy.array([[1,2,3,4],[1,2,3,4],[1,2,3,4],[1,2,3,4]])
A_s = scipy.sparse.lil_matrix(A)
B_s = scipy.sparse.lil_matrix(B)
所以你想转换原来的问题:
C = A.dot(B)
C[A.nonzero()] = A[A.nonzero()]
到一些稀疏的东西。 只是为了解决这个问题,上面的直接“稀疏”翻译是:
C_s = A_s.dot(B_s)
C_s[A_s.nonzero()] = A_s[A_s.nonzero()]
但听起来您对此并不满意,因为它首先计算所有点积,您担心这可能效率低下。
那么,您的问题是,如果您首先找到零,并且只评估这些元素上的点积,那会更快吗? IE。对于密集矩阵,这可能类似于:
Xs, Ys = numpy.nonzero(A==0)
D = A[:]
D[Xs, Ys] = map ( lambda x,y: A[x,:].dot(B[:,y]), Xs, Ys)
让我们将其转换为稀疏矩阵。我在这里的主要绊脚石是找到“零”索引;因为A_s==0
对稀疏矩阵没有意义,所以我是这样找到它们的:
Xmax, Ymax = A_s.shape
DenseSize = Xmax * Ymax
Xgrid, Ygrid = numpy.mgrid[0:Xmax, 0:Ymax]
Ygrid = Ygrid.reshape([DenseSize,1])[:,0]
Xgrid = Xgrid.reshape([DenseSize,1])[:,0]
AllIndices = numpy.array([Xgrid, Ygrid])
NonzeroIndices = numpy.array(A_s.nonzero())
ZeroIndices = numpy.array([x for x in AllIndices.T.tolist() if x not in NonzeroIndices.T.tolist()]).T
如果您知道更好/更快的方法,请务必尝试一下。一旦我们有了零索引,我们就可以像以前一样进行类似的映射:
D_s = A_s[:]
D_s[ZeroIndices[0], ZeroIndices[1]] = map ( lambda x, y : A_s[x,:].dot(B[:,y])[0], ZeroIndices[0], ZeroIndices[1] )
这会为您提供稀疏矩阵结果。
现在我不知道这是否更快。我主要是尝试一下,因为这是一个有趣的问题,看看我是否可以在 python 中做到这一点。事实上,我怀疑它可能不会比直接全矩阵点积更快,因为它使用列表理解和映射在大型数据集上(就像你说的,你期望很多零)。但它是你的问题的答案“我怎样才能只计算零值的点积而不将矩阵作为一个整体相乘”。我很想看看你是否真的尝试一下它在数据集上的速度比较。
编辑:我在下面放了一个基于上述内容的示例“块处理”版本,我认为它应该允许您毫无问题地处理您的大型数据集。让我知道它是否有效。
import numpy
import scipy.sparse
# example matrices and sparse versions
A = numpy.array([[1, 2, 0, 1], [1, 0, 1, 2], [0, 1, 2 ,1], [1, 2, 1, 0]])
B = numpy.array([[1,2,3,4],[1,2,3,4],[1,2,3,4],[1,2,3,4]])
A_s = scipy.sparse.lil_matrix(A)
B_s = scipy.sparse.lil_matrix(B)
# Choose a grid division (i.e. how many processing blocks you want to create)
BlockGrid = numpy.array([2,2])
D_s = A_s[:] # initialise from A
Xmax, Ymax = A_s.shape
BaseBSiz = numpy.array([Xmax, Ymax]) / BlockGrid
for BIndX in range(0, Xmax, BlockGrid[0]):
for BIndY in range(0, Ymax, BlockGrid[1]):
BSizX, BSizY = D_s[ BIndX : BIndX + BaseBSiz[0], BIndY : BIndY + BaseBSiz[1] ].shape
Xgrid, Ygrid = numpy.mgrid[BIndX : BIndX + BSizX, BIndY : BIndY + BSizY]
Xgrid = Xgrid.reshape([BSizX*BSizY,1])[:,0]
Ygrid = Ygrid.reshape([BSizX*BSizY,1])[:,0]
AllInd = numpy.array([Xgrid, Ygrid]).T
NZeroInd = numpy.array(A_s[Xgrid, Ygrid].reshape((BSizX,BSizY)).nonzero()).T + numpy.array([[BIndX],[BIndY]]).T
ZeroInd = numpy.array([x for x in AllInd.tolist() if x not in NZeroInd.tolist()]).T
#
# Replace zero-values in current block
D_s[ZeroInd[0], ZeroInd[1]] = map ( lambda x, y : A_s[x,:].dot(B[:,y])[0], ZeroInd[0], ZeroInd[1] )
【讨论】:
感谢您的解决方案!不幸的是,稀疏性和大型数据集在这个问题中很重要:您的代码在np.mgrid[0:Xmax, 0:Ymax]
上立即遇到 MemoryError,试图分配 2 x 250000 x 1700
float32 单元格。 :-)
hm ...也许有更好的方法来获取零值的索引?理论上你可以从矩阵的密集版本中获取它们......但这意味着首先创建矩阵的密集版本,我假设你没有可用的(除非你这样做,并且您只需转换为稀疏矩阵以用于算法/优化目的)。
此外,您可以通过将此网格拆分为内存友好的块并以非常小的循环遍历所有块来避免内存错误。
@rocknrollnerd 根据我上面的评论,我编辑了我的答案,以添加基于先前方法的块处理方法的玩具示例。让我知道这是否适合您。【参考方案2】:
naive
代码的可能更简洁、更快速的版本:
In [57]: r,c=A.nonzero() # this uses A.tocoo()
In [58]: C=A*B
In [59]: Cl=C.tolil()
In [60]: Cl[r,c]=A.tolil()[r,c]
In [61]: Cl.tocsr()
C[r,c]=A[r,c]
给出了效率警告,但我认为这更多是为了让人们循环执行这种任务。
In [63]: %%timeit C=A*B
...: C[r,c]=A[r,c]
...
The slowest run took 7.32 times longer than the fastest....
1000 loops, best of 3: 334 µs per loop
In [64]: %%timeit C=A*B
...: Cl=C.tolil()
...: Cl[r,c]=A.tolil()[r,c]
...: Cl.tocsr()
...:
100 loops, best of 3: 2.83 ms per loop
我的 A
很小,只有 (250,100),但看起来往返 lil
并不能节省时间,尽管有警告。
当A
稀疏时,用A==0
屏蔽必然会出现问题
In [66]: Az=A==0
....SparseEfficiencyWarning...
In [67]: r1,c1=Az.nonzero()
与A
的nonzero
r
相比,这个r1
要大得多——稀疏矩阵中全零的行索引;除了 25 个非零值之外的所有内容。
In [70]: r.shape
Out[70]: (25,)
In [71]: r1.shape
Out[71]: (24975,)
如果我用r1
索引A
,我会得到一个更大的数组。实际上,我正在重复每一行中的零个数
In [72]: A[r1,:]
Out[72]:
<24975x100 sparse matrix of type '<class 'numpy.float64'>'
with 2473 stored elements in Compressed Sparse Row format>
In [73]: A
Out[73]:
<250x100 sparse matrix of type '<class 'numpy.float64'>'
with 25 stored elements in Compressed Sparse Row format>
我将非零元素的形状和数量增加了大约 100(列数)。
定义foo
,并复制 Divakar 的测试:
def foo(A,B):
r,c = A.nonzero()
C = A*B
C[r,c] = A[r,c]
return C
In [83]: timeit naive(A,B)
100 loops, best of 3: 2.53 ms per loop
In [84]: timeit proposed(A,B)
/...
SparseEfficiencyWarning)
100 loops, best of 3: 4.48 ms per loop
In [85]: timeit foo(A,B)
...
SparseEfficiencyWarning)
100 loops, best of 3: 2.13 ms per loop
所以我的版本有适度的速度改进。正如 Divakar 发现的那样,改变稀疏性会改变相对优势。我希望尺寸也会改变它们。
A.nonzero
使用coo
格式的事实表明,使用该格式构造新数组可能是可行的。许多sparse
代码通过coo
值构建一个新矩阵。
In [97]: Co=C.tocoo()
In [98]: Ao=A.tocoo()
In [99]: r=np.concatenate((Co.row,Ao.row))
In [100]: c=np.concatenate((Co.col,Ao.col))
In [101]: d=np.concatenate((Co.data,Ao.data))
In [102]: r.shape
Out[102]: (79,)
In [103]: C1=sparse.csr_matrix((d,(r,c)),shape=A.shape)
In [104]: C1
Out[104]:
<250x100 sparse matrix of type '<class 'numpy.float64'>'
with 78 stored elements in Compressed Sparse Row format>
我认为这个C1
具有与通过其他方式构造的C
相同的非零元素。但我认为一个值是不同的,因为r
更长。在这个特定的示例中,C
和 A
共享一个非零元素,coo
样式的输入将它们相加,而我们希望 A
值覆盖所有内容。
如果你能容忍这种差异,这是一种更快的方法(至少对于这个测试用例):
def bar(A,B):
C=A*B
Co=C.tocoo()
Ao=A.tocoo()
r=np.concatenate((Co.row,Ao.row))
c=np.concatenate((Co.col,Ao.col))
d=np.concatenate((Co.data,Ao.data))
return sparse.csr_matrix((d,(r,c)),shape=A.shape)
In [107]: timeit bar(A,B)
1000 loops, best of 3: 1.03 ms per loop
【讨论】:
好吧,我想我不会比这更好了。 :-) 谢谢,我会接受的。 ...有趣的是,我已经返回到我的生产数据集,tolil()
有很大帮助 - 没有它,C[r,c] = A[r,c]
行运行几分钟而不是大约 14 秒(带 lil) .我在矩阵中有一些密集的列,也许这就是原因。
我在另一个 SO question 中发现 scikit-learn 添加了一些快速的 csr 实用程序。 scikit-learn.org/stable/developers/utilities.html【参考方案3】:
破解了!好吧,我在此过程中学到了很多针对稀疏矩阵的 scipy 内容。这是我可以召集的实现 -
# Find the indices in output array that are to be updated
R,C = ((A!=0).dot(B!=0)).nonzero()
mask = np.asarray(A[R,C]==0).ravel()
R,C = R[mask],C[mask]
# Make a copy of A and get the dot product through sliced rows and columns
# off A and B using the definition of matrix-multiplication
out = A.copy()
out[R,C] = (A[R].multiply(B[:,C].T).sum(1)).ravel()
最昂贵的部分似乎是元素乘法和求和。在一些快速的时序测试中,在具有高度稀疏性的稀疏矩阵上,这似乎会在性能方面击败最初的基于点掩码的解决方案,我认为这是因为它对内存效率的关注。
运行时测试
函数定义-
def naive(A, B):
mask = A != 0
out = A.dot(B).tolil()
out[mask] = A[mask]
return out.tocsr()
def proposed(A, B):
R,C = ((A!=0).dot(B!=0)).nonzero()
mask = np.asarray(A[R,C]==0).ravel()
R,C = R[mask],C[mask]
out = A.copy()
out[R,C] = (A[R].multiply(B[:,C].T).sum(1)).ravel()
return out
时间安排 -
In [57]: # Input matrices
...: M,N = 25000, 170
...: A = sparse.rand(M, N, density=0.001, format='csr')
...: B = sparse.rand(N, N, density=0.02, format='csr')
...:
In [58]: %timeit naive(A, B)
10 loops, best of 3: 92.2 ms per loop
In [59]: %timeit proposed(A, B)
10 loops, best of 3: 132 ms per loop
In [60]: # Input matrices with increased sparse-ness
...: M,N = 25000, 170
...: A = sparse.rand(M, N, density=0.0001, format='csr')
...: B = sparse.rand(N, N, density=0.002, format='csr')
...:
In [61]: %timeit naive(A, B)
10 loops, best of 3: 78.1 ms per loop
In [62]: %timeit proposed(A, B)
100 loops, best of 3: 8.03 ms per loop
【讨论】:
谢谢!不幸的是,这遇到了稀疏问题。我做了一个更现实的例子,它看起来像我使用的矩阵(都是稀疏的,具有相当高的稀疏度)。请查看更新后的问题。 稀疏行或列索引可以通过矩阵乘法来完成(实际上已经完成了?)。A[R,:] == A*I
一些稀疏的I
。
@hpaulj 不,不可能,至少不能这么简单。我需要更新我的解决方案以合并稀疏案例,在问题更新之前我没有注意到。
@rocknrollnerd 请查看编辑。哇,解决这个问题真是太棒了!
@Divakar 太好了,谢谢!不幸的是,我需要大约这种程度的稀疏性。如果我的 Cython 方法不起作用,我会接受你的回答。以上是关于仅影响零值的两个稀疏矩阵的点积的主要内容,如果未能解决你的问题,请参考以下文章
稀疏矩阵压缩存储:CSR/CSC (Compress Sparse Row/Column)