稀疏 CSR 阵列的核外处理

Posted

技术标签:

【中文标题】稀疏 CSR 阵列的核外处理【英文标题】:Out-of-core processing of sparse CSR arrays 【发布时间】:2017-12-22 01:29:02 【问题描述】:

如何在使用 Python 保存在磁盘上的稀疏 CSR 数组块上并行应用某些函数?这可以依次完成,例如通过使用joblib.dump 保存CSR 数组,使用joblib.load(.., mmap_mode="r") 打开它并逐个处理行块。使用dask 可以更有效地完成这项工作吗?

特别是,假设不需要对稀疏数组进行所有可能的核心操作,而只需要并行加载行块(每个块都是一个 CSR 数组)并对其应用一些功能(在我的例如,来自 scikit-learn 的 estimator.predict(X))。

此外,磁盘上是否有适合此任务的文件格式? Joblib 可以工作,但我不确定作为内存映射加载的 CSR 数组的(并行)性能; spark.mllib 似乎使用一些自定义稀疏存储格式(似乎没有纯 Python 解析器)或 LIBSVM 格式(根据我的经验,scikit-learn 中的解析器比 joblib.dump 慢得多).. .

注意:我已经阅读了documentation、various issues about it on https://github.com/dask/dask/,但我仍然不确定如何最好地解决这个问题。

编辑:举个更实际的例子,下面是在密集数组中工作的代码,但在使用带有this error的稀疏数组时会失败,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2:根据下面的讨论,下面的示例克服了之前的错误,但在dask/array/core.py:L3413 中得到了关于IndexError: tuple index out of range 的错误,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()

【问题讨论】:

这取决于 joblib 如何在磁盘上存储数据。我怀疑他们将其存储为不透明的 blob,在这种情况下,很难分块读取。 @MRocklin 是的,他们有一个 NumpyPickler (github.com/joblib/joblib/blob/…) 将所有内容存储在一个 blob 中。对于稀疏的 CSR 数组,我认为这应该等同于将np.save 应用于X.dataX.indicesX.indptr 数组。事实上,以前版本的 joblib.dump 导致每个 numpy 数组一个文件。优点是joblib.load("<sparse array pickled file>", mmap_mode="r")[slice, :] 已经只加载了数组的一个块.. 在最新版本的scipy 中有一个sparse.savenz。对于csr 格式,它使用np.savez 来保存dict(data=matrix.data, indices=matrix.indices, indptr=matrix.indptr)。也就是说,矩阵的关键属性被保存到单独的zip 归档文件中。 “分块”负载必须从所有 3 个数组中读取。 Sparse 有 vstackhstack 但它们与 numpy 版本有很大不同。他们使用coo 属性构建了一个新矩阵。 np.load('test.npz',mmap_mode='r') 不会引发错误,但在创建 NpzFile 对象时会忽略 mmap_mode 值,因此不会执行任何操作。 【参考方案1】:

所以我对joblib或dask一无所知,更不用说您的应用程序特定数据格式了。但实际上可以在保留稀疏数据结构的同时从磁盘中分块读取稀疏矩阵。

虽然Wikipedia article for the CSR format 很好地解释了它的工作原理,但我将简要回顾一下:

一些稀疏矩阵,例如:

1 0 2
0 0 3
4 5 6

通过记住每个非零值及其所在的列来存储:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)

现在我们仍然缺少行。压缩格式只存储每行中有多少个非零值,而不是存储每个单独的值行。

请注意,非零计数也会累加,因此以下数组包含直到并包括这一行的非零值的数量。更复杂的是,数组总是以 0 开头,因此包含 num_rows+1 条目:

sparse.indptr = 0 2 3 6

所以直到并包括第二行,有 3 个非零值,即 123

既然我们已经解决了这个问题,我们就可以开始“切片”矩阵了。目标是为某些块构造dataindicesindptr 数组。假设原始的巨大矩阵存储在三个二进制文件中,我们将增量读取。我们使用生成器重复yield 一些块。

为此,我们需要知道每个块中有多少非零值,并读取相应数量的值和列索引。非零计数可以方便地从 indptr 数组中读取。这是通过从与所需块大小相对应的巨大indptr 文件中读取一些条目来实现的。 indptr 文件那部分的最后一个条目减去之前的非零值的数量,得出该块中非零的数量。所以 dataindices 数组块只是从 dataindices 大文件中分割出来的。 indptr 数组需要人为地在前面加上一个零(这就是格式想要的,不要问我:D)。

然后我们可以用块dataindicesindptr构造一个稀疏矩阵,得到一个新的稀疏矩阵。

需要注意的是,不能单独从三个数组直接重构出实际的矩阵大小。它要么是矩阵的最大列索引,要么是你运气不好,在未确定的块中没有数据。所以我们还需要传递列数。

我可能以一种相当复杂的方式解释了事情,所以只要把它当作一段不透明的代码来阅读,它实现了这样一个生成器:

import numpy as np
import scipy.sparse


def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
                sparse_indptr_path, dtype=np.float32, column_size=None):
    data_item_size = dtype().itemsize

    with open(sparse_data_path, 'rb') as data_file, \
            open(sparse_indices_path, 'rb') as indices_file, \
            open(sparse_indptr_path, 'rb') as indptr_file:
        nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)

        while True:
            indptr_batch = np.frombuffer(nnz_before.tobytes() +
                              indptr_file.read(4*batch_size), dtype=np.int32)

            if len(indptr_batch) == 1:
                break

            batch_indptr = indptr_batch - nnz_before
            nnz_before = indptr_batch[-1]
            batch_nnz = np.asscalar(batch_indptr[-1])

            batch_data = np.frombuffer(data_file.read(
                                       data_item_size * batch_nnz), dtype=dtype)
            batch_indices = np.frombuffer(indices_file.read(
                                          4 * batch_nnz), dtype=np.int32)

            dimensions = (len(indptr_batch)-1, column_size)

            matrix = scipy.sparse.csr_matrix((batch_data, 
                           batch_indices, batch_indptr), shape=dimensions)

            yield matrix


if __name__ == '__main__':
    sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)

    sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
    sparse.indices.tofile('sparse.indices')  # dtype=int32
    sparse.indptr.tofile('sparse.indptr')    # dtype=int32

    print(sparse.toarray())
    print('========')

    for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
                             'sparse.indptr', column_size=4):
        print(batch.toarray())

numpy.ndarray.tofile() 只是存储二进制数组,所以你需要记住数据格式。 scipy.sparseindicesindptr 表示为int32,因此这是对总矩阵大小的限制。

我还对代码进行了基准测试,发现 scipy csr 矩阵构造函数是小矩阵的瓶颈。您的里程可能会有所不同,这只是一个“原则证明”。

如果需要更复杂的实现,或者有些东西太晦涩,请联系我 :)

【讨论】:

以上是关于稀疏 CSR 阵列的核外处理的主要内容,如果未能解决你的问题,请参考以下文章

OpenCV大型阵列类型Mat类

相控阵雷达天线不同阵列的特性matlab仿真分析——有限扫描阵,稀疏阵,多波束阵,共形阵

阵列处理机

Foxx 服务阵列处理

RAID阵列

阵列处理性能