左循环 numpy 数组的最快方法(如弹出、推送队列)
Posted
技术标签:
【中文标题】左循环 numpy 数组的最快方法(如弹出、推送队列)【英文标题】:Fastest way to left-cycle a numpy array (like pop, push for a queue) 【发布时间】:2017-08-03 21:51:09 【问题描述】:使用 numpy 数组,我想执行这个操作:
将x[1],...,x[n-1]
移动到x[0],...,x[n-2]
(左移),
在最后一个索引中写入一个新值:x[n-1] = newvalue
。
这类似于先进后出队列的pop()
、push(newvalue)
(仅倒置)。
一个简单的实现是:x[:-1] = x[1:]; x[-1] = newvalue
。
使用np.concatenate
的另一个实现速度较慢:np.concatenate((x[1:], np.array(newvalue).reshape(1,)), axis=0)
。
有没有最快的方法?
【问题讨论】:
注意:这与***.com/questions/30262736/…中的问题不相同 “幼稚”版本对我来说看起来不错。为什么会有更快的东西?您必须将值复制到新数组或自身。当我在x=np.arange(100000)
上测试您的代码时,我得到的时间类似于21.5 µs per loop
。这对我来说看起来很快。
如果不复制数组的内容就没有办法做到这一点,所以我认为你不能比“幼稚”的方法做得更好。如果这是一个瓶颈,那么您可能需要考虑使用不同的数据结构,例如一个deque
,其中push-and-pop操作不需要复制,可以在恒定时间内完成。
好的,感谢您的回复,这也是我的直觉。我正在尝试使用双端队列。
哦,其实在算法中,不仅需要X[0]
和X[1]
,还需要数组中间的值,所以deque
是没用的。对不起!无论如何感谢您的回复!
【参考方案1】:
经过一些实验,很明显:
需要复制, 对于nparray
(numpy 数组)而言,最快、最简单的方法是切片和复制。
所以解决方案是:x[:-1] = x[1:]; x[-1] = newvalue
。
这是一个小基准:
>>> x = np.random.randint(0, 1e6, 10**8); newvalue = -100
>>> %timeit x[:-1] = x[1:]; x[-1] = newvalue
1000 loops, best of 3: 73.6 ms per loop
>>> %timeit np.concatenate((x[1:], np.array(newvalue).reshape(1,)), axis=0)
1 loop, best of 3: 339 ms per loop
但是,如果您不需要快速访问数组中的所有值,而只需要第一个或最后一个值,则使用 deque
会更智能。
【讨论】:
当你使用双端队列时,这个例子会是什么样子?【参考方案2】:我知道我迟到了,这个问题已经得到了满意的回答,但我刚刚遇到了类似的记录流数据缓冲区的问题。
您提到了“先进后出”,它是一个堆栈,但您的示例演示了一个队列,因此我将分享一个不需要复制以使新项目入队的队列的解决方案。 (您将最终需要使用 numpy.roll 进行一次复制,以将最终数组传递给另一个函数。)
您可以使用带有指针的circular array 来跟踪尾部的位置(您将向队列中添加新项目的位置)。
如果你从这个数组开始:
x[0], x[1], x[2], x[3], x[4], x[5]
/\
tail
如果您想删除 x[0] 并添加 x[6],您可以使用最初为数组分配的内存来执行此操作,而无需复制
x[6], x[1], x[2], x[3], x[4], x[5]
/\
tail
等等……
x[6], x[7], x[2], x[3], x[4], x[5]
/\
tail
每次入队时,您将尾部向右移动一个位置。您可以使用模数很好地包装:new_tail = (old_tail + 1) % length
。
找到队列的头部总是在尾部之后的一个位置。这可以使用相同的公式找到:head = (tail + 1) % length
。
head
\/
x[6], x[7], x[2], x[3], x[4], x[5]
/\
tail
这是我为这个循环缓冲区/数组创建的类的示例:
# benchmark_circular_buffer.py
import numpy as np
# all operations are O(1) and don't require copying the array
# except to_array which has to copy the array and is O(n)
class RecordingQueue1D:
def __init__(self, object: object, maxlen: int):
#allocate the memory we need ahead of time
self.max_length: int = maxlen
self.queue_tail: int = maxlen - 1
o_len = len(object)
if (o_len == maxlen):
self.rec_queue = np.array(object, dtype=np.int64)
elif (o_len > maxlen):
self.rec_queue = np.array(object[o_len-maxlen:], dtype=np.int64)
else:
self.rec_queue = np.append(np.array(object, dtype=np.int64), np.zeros(maxlen-o_len, dtype=np.int64))
self.queue_tail = o_len - 1
def to_array(self) -> np.array:
head = (self.queue_tail + 1) % self.max_length
return np.roll(self.rec_queue, -head) # this will force a copy
def enqueue(self, new_data: np.array) -> None:
# move tail pointer forward then insert at the tail of the queue
# to enforce max length of recording
self.queue_tail = (self.queue_tail + 1) % self.max_length
self.rec_queue[self.queue_tail] = new_data
def peek(self) -> int:
queue_head = (self.queue_tail + 1) % self.max_length
return self.rec_queue[queue_head]
def replace_item_at(self, index: int, new_value: int):
loc = (self.queue_tail + 1 + index) % self.max_length
self.rec_queue[loc] = new_val
def item_at(self, index: int) -> int:
# the item we want will be at head + index
loc = (self.queue_tail + 1 + index) % self.max_length
return self.rec_queue[loc]
def __repr__(self):
return "tail: " + str(self.queue_tail) + "\narray: " + str(self.rec_queue)
def __str__(self):
return "tail: " + str(self.queue_tail) + "\narray: " + str(self.rec_queue)
# return str(self.to_array())
rnd_arr = np.random.randint(0, 1e6, 10**8)
new_val = -100
slice_arr = rnd_arr.copy()
c_buf_arr = RecordingQueue1D(rnd_arr.copy(), len(rnd_arr))
# Test speed for queuing new a new item
# swapping items 100 and 1000
# swapping items 10000 and 100000
def slice_and_copy():
slice_arr[:-1] = slice_arr[1:]
slice_arr[-1] = new_val
old = slice_arr[100]
slice_arr[100] = slice_arr[1000]
old = slice_arr[10000]
slice_arr[10000] = slice_arr[100000]
def circular_buffer():
c_buf_arr.enqueue(new_val)
old = c_buf_arr.item_at(100)
slice_arr[100] = slice_arr[1000]
old = slice_arr[10000]
slice_arr[10000] = slice_arr[100000]
# lets add copying the array to a new numpy.array
# this will take O(N) time for the circular buffer because we use numpy.roll()
# which copies the array.
def slice_and_copy_assignemnt():
slice_and_copy()
my_throwaway_arr = slice_arr.copy()
return my_throwaway_arr
def circular_buffer_assignment():
circular_buffer()
my_throwaway_arr = c_buf_arr.to_array().copy()
return my_throwaway_arr
# test using
# python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy()"
# python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer()"
# python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
# python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer_assignment()"
当您必须将大量项目排入队列而不需要交出数组副本时,这比切片快几个数量级。
访问项目和替换项目是 O(1)。 Enqueue 和 peek 都是 O(1)。复制数组需要 O(n) 时间。
基准测试结果:
(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy()"
10 loops, best of 5: 36.7 msec per loop
(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer()"
200000 loops, best of 5: 1.04 usec per loop
(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
2 loops, best of 5: 166 msec per loop
(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
4 loops, best of 5: 159 msec per loop
(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer_assignment()"
4 loops, best of 5: 511 msec per loop
my GitHub here上有一个测试脚本和一个处理二维数组的实现
【讨论】:
您能否添加一个类似于我之前所做的基准 (***.com/a/42828629/5889533) 进行比较? @Næreen 这是个好主意,我会尝试一下并添加结果。 @Næreen 我对构造函数进行了一些显着的速度改进,并添加了一种方法来替换队列中任何位置的项目。不进行基准测试并直接将其用于生产是愚蠢的。我知道它会更快,但这仍然是我的懒惰,所以我很高兴你给了我推动力。正如我所料,对于将新项目排队,循环缓冲区比切片方法 1 微秒对 36 毫秒快几个数量级。但是,如果您需要复制数组,则开销会显着增加,因此您需要为给定场景选择合适的解决方案。 ? 干得好@L-co。你的模块很有趣!它需要的代码比我上面描述的(几年前)天真的切片和写入方法要多,但它既聪明又高效!以上是关于左循环 numpy 数组的最快方法(如弹出、推送队列)的主要内容,如果未能解决你的问题,请参考以下文章
将交错的 NumPy 整数数组转换为 complex64 的最快方法是啥?
使用 NumPy 将 ubyte [0, 255] 数组转换为浮点数组 [-0.5, +0.5] 的最快方法