Python/NumPy:实现运行总和(但不完全)

Posted

技术标签:

【中文标题】Python/NumPy:实现运行总和(但不完全)【英文标题】:Python/NumPy: implementing a running sum (but not quite) 【发布时间】:2012-04-16 01:40:22 【问题描述】:

给定两个长度相等的数组,一个保存数据,一个保存结果但最初设置为零,例如:

a = numpy.array([1, 0, 0, 1, 0, 1, 0, 0, 1, 1])
b = numpy.array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

我想计算 a 中三个相邻元素的所有可能子集的总和。如果和为0或1,则b中对应的三个元素保持不变;只有总和超过1时,b中对应的三个元素才设为1,这样计算后b就变成了

array([0, 0, 0, 1, 1, 1, 0, 1, 1, 1])

一个简单的循环就可以完成这个:

for x in range(len(a)-2):
    if a[x:x+3].sum() > 1:
        b[x:x+3] = 1

在此之后,b 具有所需的形式。

我必须为大量数据执行此操作,因此速度是一个问题。 NumPy中是否有更快的方法来执行上述操作?

(我知道这类似于卷积,但不完全相同)。

【问题讨论】:

【参考方案1】:

你可以从卷积开始,选择超过1的值,最后使用“dilation”:

b = numpy.convolve(a, [1, 1, 1], mode="same") > 1
b = b | numpy.r_[0, b[:-1]] | numpy.r_[b[1:], 0]

由于这避免了 Python 循环,它应该比你的方法更快,但我没有做计时。

另一种方法是使用第二个卷积来扩张:

kernel = [1, 1, 1]
b = numpy.convolve(a, kernel, mode="same") > 1
b = numpy.convolve(b, kernel, mode="same") > 0

如果您有可用的 SciPy,则另一个扩张选项是

b = numpy.convolve(a, [1, 1, 1], mode="same") > 1
b = scipy.ndimage.morphology.binary_dilation(b)

编辑:通过some timings,我发现这个解决方案对于大型数组来说似乎是最快的:

b = numpy.convolve(a, kernel) > 1
b[:-1] |= b[1:]  # Shift and "smearing" to the *left* (smearing with b[1:] |= b[:-1] does not work)
b[:-1] |= b[1:]  # … and again!
b = b[:-2]

对于一百万个条目的数组,它比您在我的机器上使用的原始方法快 200 多倍。正如 EOL 在 cmets 中指出的那样,这个解决方案可能被认为有点脆弱,因为它取决于 NumPy 的实现细节。

【讨论】:

完全符合我的建议,但要快 30 秒。 ;) 在 OP 的 a 上,这实际上更慢,但随着数组的增长,它似乎变得更好。 +1:NumPy 的功能在这里得到了很好的利用。优雅高效的代码。 @SvenMarnach:我修复了您的最后一个解决方案,但结果不正确。这是相当微妙的,但b[1:] |= b[:-1] 给出的结果与b[:-1] |= b[1:] 不同:第一种形式没有达到您的预期,因为计算中使用了更新的 b 值。最后一个解决方案似乎依赖于实现,因为 b 在使用时被修改:它仅在 NumPy 内部循环在 OR 计算期间不破坏原始 b 值及其更新值时才有效,这取决于 NumPy 内部是否循环是否从第一个元素到最后一个元素。 @SvenMarnach:使用kernel = numpy.array([True]*3) 可以直接用b = numpy.convolve(b, kernel, mode="same") 结束第二个解决方案(无需对结果进行另一个循环来计算> 0)。这稍微快一些(可能快 10 %)。【参考方案2】:

您可以通过以下方式有效地计算“卷积”总和:

>>> a0 = a[:-2]
>>> a1 = a[1:-1]
>>> a2 = a[2:]
>>> a_large_sum = a0 + a1 + a2 > 1

更新b 可以通过编写意味着“至少三个相邻的a_large_sum 值中的一个为真”来有效地完成:您首先将a_large_sum 数组扩展回与@ 相同数量的元素987654325@(向右,向左,向右,再向左):

>>> a_large_sum_0 = np.hstack([a_large_sum, [False, False]])
>>> a_large_sum_1 = np.hstack([[False], a_large_sum, [False]])
>>> a_large_sum_2 = np.hstack([[False, False], a_large_sum])

然后您以有效的方式获得b

>>> b = a_large_sum_0 | a_large_sum_1 | a_large_sum_2

这给出了您获得的结果,但以一种非常有效的方式,通过利用 NumPy 内部快速循环。

PS:这种方法本质上与 Sven 的第一个解决方案相同,但比 Sven 的优雅代码更乏味;然而,它同样快。 Sven 的第二个解决方案 (double convolve()) 更加优雅,并且速度提高了一倍。

【讨论】:

感谢大家的有用回复。我不懂一些语法,但我确实了解双卷积 - 非常好!我明天实现它,看看速度提升。【参考方案3】:

您可能还想看看 NumPy 的 stride_tricks。使用 Sven 的计时设置(请参阅 Sven 的答案中的链接),我发现对于(非常)大的数组,这也是一种快速的方法来做你想做的事(即使用你对 a 的定义):

shape = (len(a)-2,3)
strides = a.strides+a.strides
a_strided = numpy.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
b = np.r_[numpy.sum(a_strided, axis=-1) > 1, False, False]
b[2:] |= b[1:-1] | b[:-2]

编辑后(见下面的 cmets)它不再是最快的方法了。

这会在您的原始数组上创建一个特别跨步的视图。 a 中的数据不会被复制,只是以新的方式查看。我们基本上想要创建一个新数组,其中最后一个索引包含我们想要求和的子数组(即您想要求和的三个元素)。这样我们就可以很容易的和最后一个命令求和了。

因此,这个新形状的最后一个元素必须是3,第一个元素将是旧a 的长度减2(因为我们只能求和-2nd 元素)。

strides 列表包含新数组a_strided 为到达形状的每个维度中的下一个元素所需的步幅(以字节为单位)。如果将它们设置为相等,则意味着a_strided[0,1]a_strided[1,0] 都将是a[1],这正是我们想要的。在普通数组中,情况并非如此(第一个步幅将是“第一维的大小乘以数组第一维的长度(= shape[0])”),但在这种情况下,我们可以好好利用它。

不确定我是否解释得很好,但只需打印出 a_strided,您就会看到结果是什么以及这使操作变得多么容易。

【讨论】:

有趣。我猜一个简单的len(a)就相当于你的a.shape[0],在这种情况下,不是吗? 到最后,您的意思是“second 步幅将是“size-of-...”...”,对吧?第一个步幅只是单个元素的大小(以字节为单位)。 请注意,您的答案只给出了一半的答案:求和数组中的值必须用于创建一个新的b 数组,就像原始问题中一样。您使用什么代码进行时序测试? @1st:是的。 @2nd:不,我不这么认为。当然,这取决于内存顺序,但如果你尝试例如np.ones((3,3)).strides,您会看到默认 NumPy 顺序中的第一个步幅更大。 @3rd:啊,当然,显然没有很好地阅读问题。这会添加额外的代码,使其比其他选项稍慢。我会编辑答案,谢谢 EOL。 感谢 egpbos。我的错误,关于@2nd。

以上是关于Python/NumPy:实现运行总和(但不完全)的主要内容,如果未能解决你的问题,请参考以下文章

Python Numpy累积/差异[重复]

numpy sum 给出错误

Python Numpy详解

(Python)numpy 常用操作

如何将复数从 python numpy 传递给 c(目前正在尝试使用 SWIG)

总和 MADlib UDF Spark SQL