如何在 numpy 中进行分散和收集操作?

Posted

技术标签:

【中文标题】如何在 numpy 中进行分散和收集操作?【英文标题】:How to do scatter and gather operations in numpy? 【发布时间】:2018-02-14 09:26:00 【问题描述】:

我想在 Numpy 中实现 Tensorflow 或 PyTorch 的分散和聚集操作。我一直在挠头。任何指针都非常感谢!

PyTorch Scatter

PyTorch Gather

【问题讨论】:

我怀疑有问题的代码是开源的... 看起来这些方法是 C++ 方法的 Python 前端。如果您需要numpy 专家的帮助,您需要解释他们的工作。换句话说,用numpy 术语(举例)说明你想做什么。没有pytorch 的经验,我无法轻松理解文档。 @MadPhysicist 是的,代码是开源的。你可以在这里查看。这是一个非常酷的项目:openmined.org @hpaulj 解释在 Pytorch 和 TF 文档中。一键即可!它们是机器学习和数据科学的非常常见的方法,所以我有一种预感,一个 numpy 专家已经知道答案了。这些方法是 c++ 方法的 Python 前端这一事实在这里无关紧要。为什么投反对票? scatter 是 numpy.put_along_axis,gather 是 numpy.take_along_axis。 【参考方案1】:

有两个内置的 numpy 函数可以满足您的要求。

可以使用np.take_along_axis实现torch.gather,使用np.put_along_axis实现torch.scatter

【讨论】:

【参考方案2】:

scatter_nd 操作可以使用*np*'s ufuncs .at 函数来实现。

根据 TF scatter_nd's doc:

调用tf.scatter_nd(indices, values, shape)tensor_scatter_add(tf.zeros(shape, values.dtype), indices, values) 相同。

因此,您可以使用应用于np.zeros 数组的np.add.at 来重现tf.scatter_nd,请参阅下面的MVCE:

import tensorflow as tf
tf.enable_eager_execution() # Remove this line if working in TF2
import numpy as np

def scatter_nd_numpy(indices, updates, shape):
    target = np.zeros(shape, dtype=updates.dtype)
    indices = tuple(indices.reshape(-1, indices.shape[-1]).T)
    updates = updates.ravel()
    np.add.at(target, indices, updates)
    return target

indices = np.array([[[0, 0], [0, 1]], [[1, 0], [1, 1]]])
updates = np.array([[1, 2], [3, 4]])
shape = (2, 3)

scattered_tf = tf.scatter_nd(indices, updates, shape).numpy()
scattered_np = scatter_nd_numpy(indices, updates, shape)

assert np.allclose(scattered_tf, scattered_np)

注意:正如@denis 所指出的,当重复某些索引时,上述解决方案会有所不同,这可以通过使用计数器并仅获取每个重复索引的最后一个来解决。

【讨论】:

【参考方案3】:

如果您只是想要相同的功能而不是从头开始实现它,

numpy.insert() 是 pytorch 中 scatter_(dim, index, src) 操作的有力竞争者,但它只处理单个维度。

【讨论】:

【参考方案4】:

收集操作:np.take()

https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.take.html

【讨论】:

gatheris 与 np.take 不同,当尝试使用 np.take 解决 this 问题时会很清楚。【参考方案5】:

我做到了。

def gather(a, dim, index):
    expanded_index = [index if dim==i else np.arange(a.shape[i]).reshape([-1 if i==j else 1 for j in range(a.ndim)]) for i in range(a.ndim)]
    return a[expanded_index]

def scatter(a, dim, index, b): # a inplace
    expanded_index = [index if dim==i else np.arange(a.shape[i]).reshape([-1 if i==j else 1 for j in range(a.ndim)]) for i in range(a.ndim)]
    a[expanded_index] = b

【讨论】:

【参考方案6】:

scatter 方法比我预期的要多得多。我没有在 NumPy 中找到任何现成的函数。我在这里分享它是为了任何可能需要使用 NumPy 实现它的人的利益。 (ps self 是方法的目的地或输出。)

def scatter_numpy(self, dim, index, src):
    """
    Writes all values from the Tensor src into self at the indices specified in the index Tensor.

    :param dim: The axis along which to index
    :param index: The indices of elements to scatter
    :param src: The source element(s) to scatter
    :return: self
    """
    if index.dtype != np.dtype('int_'):
        raise TypeError("The values of index must be integers")
    if self.ndim != index.ndim:
        raise ValueError("Index should have the same number of dimensions as output")
    if dim >= self.ndim or dim < -self.ndim:
        raise IndexError("dim is out of range")
    if dim < 0:
        # Not sure why scatter should accept dim < 0, but that is the behavior in PyTorch's scatter
        dim = self.ndim + dim
    idx_xsection_shape = index.shape[:dim] + index.shape[dim + 1:]
    self_xsection_shape = self.shape[:dim] + self.shape[dim + 1:]
    if idx_xsection_shape != self_xsection_shape:
        raise ValueError("Except for dimension " + str(dim) +
                         ", all dimensions of index and output should be the same size")
    if (index >= self.shape[dim]).any() or (index < 0).any():
        raise IndexError("The values of index must be between 0 and (self.shape[dim] -1)")

    def make_slice(arr, dim, i):
        slc = [slice(None)] * arr.ndim
        slc[dim] = i
        return slc

    # We use index and dim parameters to create idx
    # idx is in a form that can be used as a NumPy advanced index for scattering of src param. in self
    idx = [[*np.indices(idx_xsection_shape).reshape(index.ndim - 1, -1),
            index[make_slice(index, dim, i)].reshape(1, -1)[0]] for i in range(index.shape[dim])]
    idx = list(np.concatenate(idx, axis=1))
    idx.insert(dim, idx.pop())

    if not np.isscalar(src):
        if index.shape[dim] > src.shape[dim]:
            raise IndexError("Dimension " + str(dim) + "of index can not be bigger than that of src ")
        src_xsection_shape = src.shape[:dim] + src.shape[dim + 1:]
        if idx_xsection_shape != src_xsection_shape:
            raise ValueError("Except for dimension " +
                             str(dim) + ", all dimensions of index and src should be the same size")
        # src_idx is a NumPy advanced index for indexing of elements in the src
        src_idx = list(idx)
        src_idx.pop(dim)
        src_idx.insert(dim, np.repeat(np.arange(index.shape[dim]), np.prod(idx_xsection_shape)))
        self[idx] = src[src_idx]

    else:
        self[idx] = src

    return self

gather 可能有一个更简单的解决方案,但这是我决定的:(这里 self 是从中收集值的 ndarray。)

def gather_numpy(self, dim, index):
    """
    Gathers values along an axis specified by dim.
    For a 3-D tensor the output is specified by:
        out[i][j][k] = input[index[i][j][k]][j][k]  # if dim == 0
        out[i][j][k] = input[i][index[i][j][k]][k]  # if dim == 1
        out[i][j][k] = input[i][j][index[i][j][k]]  # if dim == 2

    :param dim: The axis along which to index
    :param index: A tensor of indices of elements to gather
    :return: tensor of gathered values
    """
    idx_xsection_shape = index.shape[:dim] + index.shape[dim + 1:]
    self_xsection_shape = self.shape[:dim] + self.shape[dim + 1:]
    if idx_xsection_shape != self_xsection_shape:
        raise ValueError("Except for dimension " + str(dim) +
                         ", all dimensions of index and self should be the same size")
    if index.dtype != np.dtype('int_'):
        raise TypeError("The values of index must be integers")
    data_swaped = np.swapaxes(self, 0, dim)
    index_swaped = np.swapaxes(index, 0, dim)
    gathered = np.choose(index_swaped, data_swaped)
    return np.swapaxes(gathered, 0, dim)

【讨论】:

【参考方案7】:

Fore refindices 是 numpy 数组:

散点图更新:

ref[indices] = updates          # tf.scatter_update(ref, indices, updates)
ref[:, indices] = updates       # tf.scatter_update(ref, indices, updates, axis=1)
ref[..., indices, :] = updates  # tf.scatter_update(ref, indices, updates, axis=-2)
ref[..., indices] = updates     # tf.scatter_update(ref, indices, updates, axis=-1)

收集:

ref[indices]          # tf.gather(ref, indices)
ref[:, indices]       # tf.gather(ref, indices, axis=1)
ref[..., indices, :]  # tf.gather(ref, indices, axis=-2)
ref[..., indices]     # tf.gather(ref, indices, axis=-1)

请参阅numpy docs on indexing 了解更多信息。

【讨论】:

在您的解决方案中,您如何定义要分散 src 的维度? 这对于 PyTorch 是不正确的。以下是不同结果的示例:gist.github.com/kylemcdonald/05f8c9aaef2cda9fe5bcfd74652db1a8 这与收集功能不同,可以看到here ...那个答案不使用聚集。【参考方案8】:

对于散射,而不是像@DomJack 建议的那样使用切片分配,通常最好使用 np.add.at;因为与切片分配不同,这在存在重复索引的情况下具有明确定义的行为。

【讨论】:

定义明确是什么意思?我的理解是在 PyTorch 和 Tensorflow 中,重复索引会导致覆盖值。在 TF 的情况下,他们特别警告更新的顺序不是确定性的。我查看了 np.add.at,它似乎对“scatter_add”操作有好处(不是吗?),但这不是我想要的行为。 add.at(a, [0,0], 1) 将增加第一个元素两次”。如果你想设置最后一个值,比如 np.set.at 怎么办?但是 afaik 没有 ufunc 可以设置。 (破解 X >= 0:np.abs.at( X, ix, values ) 。)

以上是关于如何在 numpy 中进行分散和收集操作?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 中读取多个 wav 文件,并转换为 numpy 数组进行绘图

Java NIO学习笔记 三 散点/收集 和频道转换

有效地收集/分散任务

如何在 numpy 中实现定点二进制支持

flume 1.7 的配置

NumPy解释线性代数