就地映射 NumPy 数组

Posted

技术标签:

【中文标题】就地映射 NumPy 数组【英文标题】:Mapping a NumPy array in place 【发布时间】:2011-10-13 00:57:10 【问题描述】:

是否可以在原地映射 NumPy 数组?如果是,如何?

鉴于a_values - 2D 数组 - 这是目前对我有用的代码:

for row in range(len(a_values)):
    for col in range(len(a_values[0])):
        a_values[row][col] = dim(a_values[row][col])

但它太丑了,我怀疑在 NumPy 中的某个地方一定有一个函数可以对类似的东西做同样的事情:

a_values.map_in_place(dim)

但如果存在上述类似的东西,我一直找不到它。

【问题讨论】:

你可以做 a_values = np.vectorize(dim)(a_values) 并避免嵌套循环,但这仍然没有到位,所以这不是答案。 我不知道有什么函数可以做到这一点,但如果有的话,它只会让代码看起来更整洁。如果你想要 Numpy 特有的性能加速,那么你需要重写 dim() 函数来直接处理 numpy 数组。 @eryksun 是的,但它仍然没有到位操作,所以它并没有好多少,并且可能会导致我所说的额外副本 我做出了我认为是滥用vectorize 的勇敢尝试,但我现在放弃了。借调鲍勃。 @senderle - 不管它值多少钱,你的英勇尝试似乎对我来说是完美的......(而且很漂亮,考虑到所有因素)出于模糊的好奇,它哪里出错了? 【参考方案1】:

只有在空间有限的情况下,才值得尝试就地执行此操作。如果是这种情况,可以通过迭代数组的扁平视图来稍微加快代码速度。由于reshape 返回一个新视图when possible,因此不会复制数据本身(除非原始数据具有不寻常的结构)。

我不知道有更好的方法来实现任意 Python 函数的真正就地应用。

>>> def flat_for(a, f):
...     a = a.reshape(-1)
...     for i, v in enumerate(a):
...         a[i] = f(v)
... 
>>> a = numpy.arange(25).reshape(5, 5)
>>> flat_for(a, lambda x: x + 5)
>>> a

array([[ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14],
       [15, 16, 17, 18, 19],
       [20, 21, 22, 23, 24],
       [25, 26, 27, 28, 29]])

一些时间安排:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> f = lambda x: x + 5
>>> %timeit flat_for(a, f)
1000 loops, best of 3: 1.86 ms per loop

它大约是嵌套循环版本的两倍:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> def nested_for(a, f):
...     for i in range(len(a)):
...         for j in range(len(a[0])):
...             a[i][j] = f(a[i][j])
... 
>>> %timeit nested_for(a, f)
100 loops, best of 3: 3.79 ms per loop

当然,vectorize 仍然更快,所以如果你可以复制,就使用它:

>>> a = numpy.arange(2500).reshape(50, 50)
>>> g = numpy.vectorize(lambda x: x + 5)
>>> %timeit g(a)
1000 loops, best of 3: 584 us per loop

如果你可以使用内置的 ufunc 重写dim,那么请不要vectorize

>>> a = numpy.arange(2500).reshape(50, 50)
>>> %timeit a + 5
100000 loops, best of 3: 4.66 us per loop

numpy 执行+= 之类的操作,正如您所期望的那样——因此您可以免费获得具有就地应用程序的 ufunc 的速度。有时甚至更快!示例见here。


顺便说一句,我对这个问题的原始答案(可以在其编辑历史中查看)是荒谬的,并且涉及将索引矢量化为a。它不仅需要做一些时髦的事情来绕过vectorize 的type-detection mechanism,而且结果和嵌套循环版本一样慢。这么多聪明!

【讨论】:

感谢您的支持 (+1) - 我将在今天晚些时候开始工作时对其进行测试。至于为什么我需要这个....数组由pygame.surfarray.pixels2d 在内部使用。该数组是对图像像素值的引用,而不是它的副本,因此如果我想在场景中修改我的图像/精灵,我需要更改从 pygame 获得的数组。也就是说,这是我第一次接触 pynum,所以如果我遗漏了什么,欢迎您纠正我的理解! :) @mac,如果是这种情况,那么我会推荐 eryksun 的解决方案,如 cmets 中所述:a_values[:] = np.vectorize(dim)(a_values)。它会创建一个副本,但切片分配 (a_values[:]) 会就地更改数组。如果这不起作用,请告诉我。 @mac,另外,如果您打算重用dim 的矢量化版本,最好给它自己的名字,这样您就不会一直调用vectorize @mac,最后,如果你可以使用 ufunc 重写 dim,那么 slice assignment + ufunc_dim (a_values[:] = ufunc_dim(a_values)) 将是最好的解决方案,放心吧。跨度> @eryksun,基于mac的新cmets,你的解决方案是最好的。你的回答会得到我的支持。【参考方案2】:

这是一篇散布在答案和 cmets,我在接受问题的答案后写的。 总是欢迎投票,但如果你赞成这个答案,请 不要错过也为 senderle 和(如果(s)他写道 一)eryksun,他提出了以下方法。

问:是否可以在原地映射一个 numpy 数组? 答:可以,但不能使用单一数组方法。您必须编写自己的代码。

下面是一个比较线程中讨论的各种实现的脚本:

import timeit
from numpy import array, arange, vectorize, rint

# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))

# TIMER
def best(fname, reps, side):
    global a
    a = get_array(side)
        t = timeit.Timer('%s(a)' % fname,
                     setup='from __main__ import %s, a' % fname)
    return min(t.repeat(reps, 3))  #low num as in place --> converge to 1

# FUNCTIONS
def mac(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])

def mac_two(array_):
    li = range(len(array_[0]))
    for row in range(len(array_)):
        for col in li:
            array_[row][col] = int(round(array_[row][col] * 0.67328))

def mac_three(array_):
    for i, row in enumerate(array_):
        array_[i][:] = [int(round(v * 0.67328)) for v in row]


def senderle(array_):
    array_ = array_.reshape(-1)
    for i, v in enumerate(array_):
        array_[i] = dim(v)

def eryksun(array_):
    array_[:] = vectorize(dim)(array_)

def ufunc_ed(array_):
    multiplied = array_ * 0.67328
    array_[:] = rint(multiplied)

# MAIN
r = []
for fname in ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'ufunc_ed'):
    print('\nTesting `%s`...' % fname)
    r.append(best(fname, reps=50, side=50))
    # The following is for visually checking the functions returns same results
    tmp = get_array(3)
    eval('%s(tmp)' % fname)
    print tmp
tmp = min(r)/100
print('\n===== ...AND THE WINNER IS... =========================')
print('  mac (as in question)       :  %.4fms [%.0f%%]') % (r[0]*1000,r[0]/tmp)
print('  mac (optimised)            :  %.4fms [%.0f%%]') % (r[1]*1000,r[1]/tmp)
print('  mac (slice-assignment)     :  %.4fms [%.0f%%]') % (r[2]*1000,r[2]/tmp)
print('  senderle                   :  %.4fms [%.0f%%]') % (r[3]*1000,r[3]/tmp)
print('  eryksun                    :  %.4fms [%.0f%%]') % (r[4]*1000,r[4]/tmp)
print('  slice-assignment w/ ufunc  :  %.4fms [%.0f%%]') % (r[5]*1000,r[5]/tmp)
print('=======================================================\n')

上述脚本的输出——至少在我的系统中——是:

  mac (as in question)       :  88.7411ms [74591%]
  mac (optimised)            :  86.4639ms [72677%]
  mac (slice-assignment)     :  79.8671ms [67132%]
  senderle                   :  85.4590ms [71832%]
  eryksun                    :  13.8662ms [11655%]
  slice-assignment w/ ufunc  :  0.1190ms [100%]

如您所见,使用 numpy 的 ufunc 与次优和最差的替代方案相比,速度分别提高了 2 个数量级和近 3 个数量级

如果使用ufunc 不是一个选项,这里只是对其他替代方案的比较:

  mac (as in question)       :  91.5761ms [672%]
  mac (optimised)            :  88.9449ms [653%]
  mac (slice-assignment)     :  80.1032ms [588%]
  senderle                   :  86.3919ms [634%]
  eryksun                    :  13.6259ms [100%]

HTH!

【讨论】:

这是我见过的最好的自我回答之一,值得一票:)。 另外,分块切片分配技巧很有趣(在 mac_three 中),我发现自己想知道您是否可以使用 ufunc 在空间效率和时间效率之间取得有说服力的折衷列表理解——通过处理,比如说,每次迭代处理 10% 的数组,或者类似的东西。 @senderle - 感谢您对解决问题的赞赏和投入! ;) 在我的应用程序中,我只在初始化时使用这个函数来生成大约 100 个 10x10 像素的精灵,所以我并不是真的在追求超优化......我最初的问题真的只是受到希望让我的代码更整洁/学习新东西,但我准确地发布了测试的源代码,以便其他人继续玩这个,如果他们愿意的话! :) 我知道这是旧的,但是三个 cmets。 1. 我会在所有案例中将dim 等设为本地,以减少开销并更好地显示案例之间差异的比例。 2. Senderle 可以通过array_set = array_.__setitem__; any(array_set(i, dim(x)) for i, x in enumerate(array_)) 进行微优化。 3. 我不确定 eryksun 的版本是否真的就地。这有被追踪吗?在某些情况下,切片分配中的右侧项目会被完全评估以加速实际分配,因此会暂时创建一个副本。 我想以此作为练习来学习 Python 的奥秘。但是,对于 Python 3.3,我收到以下错误: Traceback(最近一次调用最后一次):文件“C:\Users\Adriano\Google Drive\python\test.py”,第 56 行,在 print(' mac (如问题):%.4fms [%.0f%%]') % (r[0]*1000,r[0]/tmp) TypeError: unsupported operand type(s) for %: 'NoneType' and 'tuple '【参考方案3】:

为什么不使用 numpy 实现和 out_ 技巧?

from numpy import array, arange, vectorize, rint, multiply, round as np_round 

def fmilo(array_):
    np_round(multiply(array_ ,0.67328, array_), out=array_)

得到:

===== ...AND THE WINNER IS... =========================
  mac (as in question)       :  80.8470ms [130422%]
  mac (optimised)            :  80.2400ms [129443%]
  mac (slice-assignment)     :  75.5181ms [121825%]
  senderle                   :  78.9380ms [127342%]
  eryksun                    :  11.0800ms [17874%]
  slice-assignment w/ ufunc  :  0.0899ms [145%]
  fmilo                      :  0.0620ms [100%]
=======================================================

【讨论】:

它看起来不错...但它不起作用! :( 您从此函数 ([[ 0 20 40][ 60 80 100][121 141 161]]) 获得的结果出于某种原因 - 与其他测试的结果不一致 ([[ 0 20 40][ 61 81 101][121 141 162]])。如果您能解决此问题,我很乐意将您的我的答案中的解决方案+为你的答案投票!:) @mac,@fabrizioM,我想我知道发生了什么。当您通过out 将输出数组传递给numpy ufunc 时,它会自动将casts the result 传递给输出数组的类型。因此,在这种情况下,浮点结果在存储之前被转换为 int(因此被截断)。所以fmilo 在功能上等同于array_ *= 0.67328。要获得所需的舍入行为,您必须执行rint((array_ * 0.67328), array_) 之类的操作。但在我的机器上,这实际上比切片分配要慢。【参考方案4】:

如果 ufuncs 是不可能的,你也许应该考虑使用 cython。 它很容易集成并在特定使用 numpy 数组时大大加快速度。

【讨论】:

真(+1)。如果您提供一个 sn-p 来实现这一点,我很乐意将它与测试等集成到我的答案中......【参考方案5】:

这只是 mac 文章的更新版本,针对 Python 3.x 实现,并添加了 numba 和 numpy.frompyfunc。

numpy.frompyfunc 接受一个任意的 python 函数并返回一个函数,该函数在 numpy.array 上强制转换时,按元素应用该函数。 但是它把数组的数据类型改成了object,所以没有到位,以后对这个数组的计算会比较慢。 为了避免这个缺点,在测试中将调用numpy.ndarray.astype,将数据类型返回为int。 作为旁注: Numba 不包含在 Python 的基本库中,如果您想对其进行测试,则必须从外部下载。在这个测试中,它实际上什么都不做,如果用 @jit(nopython=True) 调用它,它会给出一条错误消息,说它不能在那里优化任何东西。但是,由于 numba 通常可以加速以函数式编写的代码,因此包含它是为了完整性。

import timeit
from numpy import array, arange, vectorize, rint, frompyfunc
from numba import autojit

# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))

# TIMER
def best(fname, reps, side):
    global a
    a = get_array(side)
    t = timeit.Timer('%s(a)' % fname,
                     setup='from __main__ import %s, a' % fname)
    return min(t.repeat(reps, 3))  #low num as in place --> converge to 1

# FUNCTIONS
def mac(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])

def mac_two(array_):
    li = range(len(array_[0]))
    for row in range(len(array_)):
        for col in li:
            array_[row][col] = int(round(array_[row][col] * 0.67328))

def mac_three(array_):
    for i, row in enumerate(array_):
        array_[i][:] = [int(round(v * 0.67328)) for v in row]


def senderle(array_):
    array_ = array_.reshape(-1)
    for i, v in enumerate(array_):
        array_[i] = dim(v)

def eryksun(array_):
    array_[:] = vectorize(dim)(array_)

@autojit
def numba(array_):
    for row in range(len(array_)):
        for col in range(len(array_[0])):
            array_[row][col] = dim(array_[row][col])


def ufunc_ed(array_):
    multiplied = array_ * 0.67328
    array_[:] = rint(multiplied)

def ufunc_frompyfunc(array_):
    udim = frompyfunc(dim,1,1)
    array_ = udim(array_)
    array_.astype("int")

# MAIN
r = []
totest = ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'numba','ufunc_ed','ufunc_frompyfunc')
for fname in totest:
    print('\nTesting `%s`...' % fname)
    r.append(best(fname, reps=50, side=50))
    # The following is for visually checking the functions returns same results
    tmp = get_array(3)
    eval('%s(tmp)' % fname)
    print (tmp)
tmp = min(r)/100
results = list(zip(totest,r))
results.sort(key=lambda x: x[1])

print('\n===== ...AND THE WINNER IS... =========================')
for name,time in results:
    Out = ':<34: :8.4fms [:5.0f%]'.format(name,time*1000,time/tmp)
    print(Out)
print('=======================================================\n')

最后,结果:

===== ...AND THE WINNER IS... =========================
ufunc_ed                          :   0.3205ms [  100%]
ufunc_frompyfunc                  :   3.8280ms [ 1194%]
eryksun                           :   3.8989ms [ 1217%]
mac_three                         :  21.4538ms [ 6694%]
senderle                          :  22.6421ms [ 7065%]
mac_two                           :  24.6230ms [ 7683%]
mac                               :  26.1463ms [ 8158%]
numba                             :  27.5041ms [ 8582%]
=======================================================

【讨论】:

以上是关于就地映射 NumPy 数组的主要内容,如果未能解决你的问题,请参考以下文章

将 NumPy 数组按元素映射到多维数组中

从 numpy 数组映射回 pandas 时间序列的最佳方法

如何就地反转 NumPy 数组?

Python:就地映射[重复]

Numpy 从谷歌云存储加载内存映射数组(mmap_mode)

再次对 Numpy 数组进行就地类型转换?