就地映射 NumPy 数组
Posted
技术标签:
【中文标题】就地映射 NumPy 数组【英文标题】:Mapping a NumPy array in place 【发布时间】:2011-10-13 00:57:10 【问题描述】:是否可以在原地映射 NumPy 数组?如果是,如何?
鉴于a_values
- 2D 数组 - 这是目前对我有用的代码:
for row in range(len(a_values)):
for col in range(len(a_values[0])):
a_values[row][col] = dim(a_values[row][col])
但它太丑了,我怀疑在 NumPy 中的某个地方一定有一个函数可以对类似的东西做同样的事情:
a_values.map_in_place(dim)
但如果存在上述类似的东西,我一直找不到它。
【问题讨论】:
你可以做a_values = np.vectorize(dim)(a_values)
并避免嵌套循环,但这仍然没有到位,所以这不是答案。
我不知道有什么函数可以做到这一点,但如果有的话,它只会让代码看起来更整洁。如果你想要 Numpy 特有的性能加速,那么你需要重写 dim() 函数来直接处理 numpy 数组。
@eryksun 是的,但它仍然没有到位操作,所以它并没有好多少,并且可能会导致我所说的额外副本
我做出了我认为是滥用vectorize
的勇敢尝试,但我现在放弃了。借调鲍勃。
@senderle - 不管它值多少钱,你的英勇尝试似乎对我来说是完美的......(而且很漂亮,考虑到所有因素)出于模糊的好奇,它哪里出错了?
【参考方案1】:
只有在空间有限的情况下,才值得尝试就地执行此操作。如果是这种情况,可以通过迭代数组的扁平视图来稍微加快代码速度。由于reshape
返回一个新视图when possible,因此不会复制数据本身(除非原始数据具有不寻常的结构)。
我不知道有更好的方法来实现任意 Python 函数的真正就地应用。
>>> def flat_for(a, f):
... a = a.reshape(-1)
... for i, v in enumerate(a):
... a[i] = f(v)
...
>>> a = numpy.arange(25).reshape(5, 5)
>>> flat_for(a, lambda x: x + 5)
>>> a
array([[ 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14],
[15, 16, 17, 18, 19],
[20, 21, 22, 23, 24],
[25, 26, 27, 28, 29]])
一些时间安排:
>>> a = numpy.arange(2500).reshape(50, 50)
>>> f = lambda x: x + 5
>>> %timeit flat_for(a, f)
1000 loops, best of 3: 1.86 ms per loop
它大约是嵌套循环版本的两倍:
>>> a = numpy.arange(2500).reshape(50, 50)
>>> def nested_for(a, f):
... for i in range(len(a)):
... for j in range(len(a[0])):
... a[i][j] = f(a[i][j])
...
>>> %timeit nested_for(a, f)
100 loops, best of 3: 3.79 ms per loop
当然,vectorize 仍然更快,所以如果你可以复制,就使用它:
>>> a = numpy.arange(2500).reshape(50, 50)
>>> g = numpy.vectorize(lambda x: x + 5)
>>> %timeit g(a)
1000 loops, best of 3: 584 us per loop
如果你可以使用内置的 ufunc 重写dim
,那么请不要vectorize
:
>>> a = numpy.arange(2500).reshape(50, 50)
>>> %timeit a + 5
100000 loops, best of 3: 4.66 us per loop
numpy
执行+=
之类的操作,正如您所期望的那样——因此您可以免费获得具有就地应用程序的 ufunc 的速度。有时甚至更快!示例见here。
顺便说一句,我对这个问题的原始答案(可以在其编辑历史中查看)是荒谬的,并且涉及将索引矢量化为a
。它不仅需要做一些时髦的事情来绕过vectorize
的type-detection mechanism,而且结果和嵌套循环版本一样慢。这么多聪明!
【讨论】:
感谢您的支持 (+1) - 我将在今天晚些时候开始工作时对其进行测试。至于为什么我需要这个....数组由pygame.surfarray.pixels2d
在内部使用。该数组是对图像像素值的引用,而不是它的副本,因此如果我想在场景中修改我的图像/精灵,我需要更改从 pygame 获得的数组。也就是说,这是我第一次接触 pynum,所以如果我遗漏了什么,欢迎您纠正我的理解! :)
@mac,如果是这种情况,那么我会推荐 eryksun 的解决方案,如 cmets 中所述:a_values[:] = np.vectorize(dim)(a_values)
。它会创建一个副本,但切片分配 (a_values[:]
) 会就地更改数组。如果这不起作用,请告诉我。
@mac,另外,如果您打算重用dim
的矢量化版本,最好给它自己的名字,这样您就不会一直调用vectorize
。
@mac,最后,如果你可以使用 ufunc 重写 dim,那么 slice assignment + ufunc_dim (a_values[:] = ufunc_dim(a_values)
) 将是最好的解决方案,放心吧。跨度>
@eryksun,基于mac的新cmets,你的解决方案是最好的。你的回答会得到我的支持。【参考方案2】:
这是一篇散布在答案和 cmets,我在接受问题的答案后写的。 总是欢迎投票,但如果你赞成这个答案,请 不要错过也为 senderle 和(如果(s)他写道 一)eryksun,他提出了以下方法。
问:是否可以在原地映射一个 numpy 数组? 答:可以,但不能使用单一数组方法。您必须编写自己的代码。
下面是一个比较线程中讨论的各种实现的脚本:
import timeit
from numpy import array, arange, vectorize, rint
# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))
# TIMER
def best(fname, reps, side):
global a
a = get_array(side)
t = timeit.Timer('%s(a)' % fname,
setup='from __main__ import %s, a' % fname)
return min(t.repeat(reps, 3)) #low num as in place --> converge to 1
# FUNCTIONS
def mac(array_):
for row in range(len(array_)):
for col in range(len(array_[0])):
array_[row][col] = dim(array_[row][col])
def mac_two(array_):
li = range(len(array_[0]))
for row in range(len(array_)):
for col in li:
array_[row][col] = int(round(array_[row][col] * 0.67328))
def mac_three(array_):
for i, row in enumerate(array_):
array_[i][:] = [int(round(v * 0.67328)) for v in row]
def senderle(array_):
array_ = array_.reshape(-1)
for i, v in enumerate(array_):
array_[i] = dim(v)
def eryksun(array_):
array_[:] = vectorize(dim)(array_)
def ufunc_ed(array_):
multiplied = array_ * 0.67328
array_[:] = rint(multiplied)
# MAIN
r = []
for fname in ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'ufunc_ed'):
print('\nTesting `%s`...' % fname)
r.append(best(fname, reps=50, side=50))
# The following is for visually checking the functions returns same results
tmp = get_array(3)
eval('%s(tmp)' % fname)
print tmp
tmp = min(r)/100
print('\n===== ...AND THE WINNER IS... =========================')
print(' mac (as in question) : %.4fms [%.0f%%]') % (r[0]*1000,r[0]/tmp)
print(' mac (optimised) : %.4fms [%.0f%%]') % (r[1]*1000,r[1]/tmp)
print(' mac (slice-assignment) : %.4fms [%.0f%%]') % (r[2]*1000,r[2]/tmp)
print(' senderle : %.4fms [%.0f%%]') % (r[3]*1000,r[3]/tmp)
print(' eryksun : %.4fms [%.0f%%]') % (r[4]*1000,r[4]/tmp)
print(' slice-assignment w/ ufunc : %.4fms [%.0f%%]') % (r[5]*1000,r[5]/tmp)
print('=======================================================\n')
上述脚本的输出——至少在我的系统中——是:
mac (as in question) : 88.7411ms [74591%]
mac (optimised) : 86.4639ms [72677%]
mac (slice-assignment) : 79.8671ms [67132%]
senderle : 85.4590ms [71832%]
eryksun : 13.8662ms [11655%]
slice-assignment w/ ufunc : 0.1190ms [100%]
如您所见,使用 numpy 的 ufunc
与次优和最差的替代方案相比,速度分别提高了 2 个数量级和近 3 个数量级。
如果使用ufunc
不是一个选项,这里只是对其他替代方案的比较:
mac (as in question) : 91.5761ms [672%]
mac (optimised) : 88.9449ms [653%]
mac (slice-assignment) : 80.1032ms [588%]
senderle : 86.3919ms [634%]
eryksun : 13.6259ms [100%]
HTH!
【讨论】:
这是我见过的最好的自我回答之一,值得一票:)。 另外,分块切片分配技巧很有趣(在mac_three
中),我发现自己想知道您是否可以使用 ufunc 在空间效率和时间效率之间取得有说服力的折衷列表理解——通过处理,比如说,每次迭代处理 10% 的数组,或者类似的东西。
@senderle - 感谢您对解决问题的赞赏和投入! ;) 在我的应用程序中,我只在初始化时使用这个函数来生成大约 100 个 10x10 像素的精灵,所以我并不是真的在追求超优化......我最初的问题真的只是受到希望让我的代码更整洁/学习新东西,但我准确地发布了测试的源代码,以便其他人继续玩这个,如果他们愿意的话! :)
我知道这是旧的,但是三个 cmets。 1. 我会在所有案例中将dim
等设为本地,以减少开销并更好地显示案例之间差异的比例。 2. Senderle 可以通过array_set = array_.__setitem__; any(array_set(i, dim(x)) for i, x in enumerate(array_))
进行微优化。 3. 我不确定 eryksun 的版本是否真的就地。这有被追踪吗?在某些情况下,切片分配中的右侧项目会被完全评估以加速实际分配,因此会暂时创建一个副本。
我想以此作为练习来学习 Python 的奥秘。但是,对于 Python 3.3,我收到以下错误: Traceback(最近一次调用最后一次):文件“C:\Users\Adriano\Google Drive\python\test.py”,第 56 行,在 为什么不使用 numpy 实现和 out_ 技巧?
from numpy import array, arange, vectorize, rint, multiply, round as np_round
def fmilo(array_):
np_round(multiply(array_ ,0.67328, array_), out=array_)
得到:
===== ...AND THE WINNER IS... =========================
mac (as in question) : 80.8470ms [130422%]
mac (optimised) : 80.2400ms [129443%]
mac (slice-assignment) : 75.5181ms [121825%]
senderle : 78.9380ms [127342%]
eryksun : 11.0800ms [17874%]
slice-assignment w/ ufunc : 0.0899ms [145%]
fmilo : 0.0620ms [100%]
=======================================================
【讨论】:
它看起来不错...但它不起作用! :( 您从此函数 ([[ 0 20 40][ 60 80 100][121 141 161]]
) 获得的结果出于某种原因 - 与其他测试的结果不一致 ([[ 0 20 40][ 61 81 101][121 141 162]]
)。如果您能解决此问题,我很乐意将您的我的答案中的解决方案+为你的答案投票!:)
@mac,@fabrizioM,我想我知道发生了什么。当您通过out
将输出数组传递给numpy ufunc 时,它会自动将casts the result 传递给输出数组的类型。因此,在这种情况下,浮点结果在存储之前被转换为 int(因此被截断)。所以fmilo
在功能上等同于array_ *= 0.67328
。要获得所需的舍入行为,您必须执行rint((array_ * 0.67328), array_)
之类的操作。但在我的机器上,这实际上比切片分配要慢。【参考方案4】:
如果 ufuncs 是不可能的,你也许应该考虑使用 cython。 它很容易集成并在特定使用 numpy 数组时大大加快速度。
【讨论】:
真(+1)。如果您提供一个 sn-p 来实现这一点,我很乐意将它与测试等集成到我的答案中......【参考方案5】:这只是 mac 文章的更新版本,针对 Python 3.x 实现,并添加了 numba 和 numpy.frompyfunc。
numpy.frompyfunc 接受一个任意的 python 函数并返回一个函数,该函数在 numpy.array 上强制转换时,按元素应用该函数。 但是它把数组的数据类型改成了object,所以没有到位,以后对这个数组的计算会比较慢。 为了避免这个缺点,在测试中将调用numpy.ndarray.astype,将数据类型返回为int。 作为旁注: Numba 不包含在 Python 的基本库中,如果您想对其进行测试,则必须从外部下载。在这个测试中,它实际上什么都不做,如果用 @jit(nopython=True) 调用它,它会给出一条错误消息,说它不能在那里优化任何东西。但是,由于 numba 通常可以加速以函数式编写的代码,因此包含它是为了完整性。
import timeit
from numpy import array, arange, vectorize, rint, frompyfunc
from numba import autojit
# SETUP
get_array = lambda side : arange(side**2).reshape(side, side) * 30
dim = lambda x : int(round(x * 0.67328))
# TIMER
def best(fname, reps, side):
global a
a = get_array(side)
t = timeit.Timer('%s(a)' % fname,
setup='from __main__ import %s, a' % fname)
return min(t.repeat(reps, 3)) #low num as in place --> converge to 1
# FUNCTIONS
def mac(array_):
for row in range(len(array_)):
for col in range(len(array_[0])):
array_[row][col] = dim(array_[row][col])
def mac_two(array_):
li = range(len(array_[0]))
for row in range(len(array_)):
for col in li:
array_[row][col] = int(round(array_[row][col] * 0.67328))
def mac_three(array_):
for i, row in enumerate(array_):
array_[i][:] = [int(round(v * 0.67328)) for v in row]
def senderle(array_):
array_ = array_.reshape(-1)
for i, v in enumerate(array_):
array_[i] = dim(v)
def eryksun(array_):
array_[:] = vectorize(dim)(array_)
@autojit
def numba(array_):
for row in range(len(array_)):
for col in range(len(array_[0])):
array_[row][col] = dim(array_[row][col])
def ufunc_ed(array_):
multiplied = array_ * 0.67328
array_[:] = rint(multiplied)
def ufunc_frompyfunc(array_):
udim = frompyfunc(dim,1,1)
array_ = udim(array_)
array_.astype("int")
# MAIN
r = []
totest = ('mac', 'mac_two', 'mac_three', 'senderle', 'eryksun', 'numba','ufunc_ed','ufunc_frompyfunc')
for fname in totest:
print('\nTesting `%s`...' % fname)
r.append(best(fname, reps=50, side=50))
# The following is for visually checking the functions returns same results
tmp = get_array(3)
eval('%s(tmp)' % fname)
print (tmp)
tmp = min(r)/100
results = list(zip(totest,r))
results.sort(key=lambda x: x[1])
print('\n===== ...AND THE WINNER IS... =========================')
for name,time in results:
Out = ':<34: :8.4fms [:5.0f%]'.format(name,time*1000,time/tmp)
print(Out)
print('=======================================================\n')
最后,结果:
===== ...AND THE WINNER IS... =========================
ufunc_ed : 0.3205ms [ 100%]
ufunc_frompyfunc : 3.8280ms [ 1194%]
eryksun : 3.8989ms [ 1217%]
mac_three : 21.4538ms [ 6694%]
senderle : 22.6421ms [ 7065%]
mac_two : 24.6230ms [ 7683%]
mac : 26.1463ms [ 8158%]
numba : 27.5041ms [ 8582%]
=======================================================
【讨论】:
以上是关于就地映射 NumPy 数组的主要内容,如果未能解决你的问题,请参考以下文章
从 numpy 数组映射回 pandas 时间序列的最佳方法