左循环 numpy 数组的最快方法(如弹出、推送队列)

Posted

技术标签:

【中文标题】左循环 numpy 数组的最快方法(如弹出、推送队列)【英文标题】:Fastest way to left-cycle a numpy array (like pop, push for a queue) 【发布时间】:2017-08-03 21:51:09 【问题描述】:

使用 numpy 数组,我想执行这个操作:

x[1],...,x[n-1] 移动到x[0],...,x[n-2](左移), 在最后一个索引中写入一个新值:x[n-1] = newvalue

这类似于先进后出队列的pop()push(newvalue)(仅倒置)。

一个简单的实现是:x[:-1] = x[1:]; x[-1] = newvalue

使用np.concatenate 的另一个实现速度较慢:np.concatenate((x[1:], np.array(newvalue).reshape(1,)), axis=0)

有没有最快的方法?

【问题讨论】:

注意:这与***.com/questions/30262736/…中的问题相同 “幼稚”版本对我来说看起来不错。为什么会有更快的东西?您必须将值复制到新数组或自身。当我在x=np.arange(100000) 上测试您的代码时,我得到的时间类似于21.5 µs per loop。这对我来说看起来很快。 如果不复制数组的内容就没有办法做到这一点,所以我认为你不能比“幼稚”的方法做得更好。如果这是一个瓶颈,那么您可能需要考虑使用不同的数据结构,例如一个deque,其中push-and-pop操作不需要复制,可以在恒定时间内完成。 好的,感谢您的回复,这也是我的直觉。我正在尝试使用双端队列。 哦,其实在算法中,不仅需要X[0]X[1],还需要数组中间的值,所以deque是没用的。对不起!无论如何感谢您的回复! 【参考方案1】:

经过一些实验,很明显:

需要复制, 对于nparray(numpy 数组)而言,最快、最简单的方法是切片和复制。

所以解决方案是:x[:-1] = x[1:]; x[-1] = newvalue

这是一个小基准:

>>> x = np.random.randint(0, 1e6, 10**8); newvalue = -100
>>> %timeit x[:-1] = x[1:]; x[-1] = newvalue
1000 loops, best of 3: 73.6 ms per loop
>>> %timeit np.concatenate((x[1:], np.array(newvalue).reshape(1,)), axis=0) 
1 loop, best of 3: 339 ms per loop

但是,如果您不需要快速访问数组中的所有值,而只需要第一个或最后一个值,则使用 deque 会更智能。

【讨论】:

当你使用双端队列时,这个例子会是什么样子?【参考方案2】:

我知道我迟到了,这个问题已经得到了满意的回答,但我刚刚遇到了类似的记录流数据缓冲区的问题。

您提到了“先进后出”,它是一个堆栈,但您的示例演示了一个队列,因此我将分享一个不需要复制以使新项目入队的队列的解决方案。 (您最终需要使用 numpy.roll 进行一次复制,以将最终数组传递给另一个函数。)

您可以使用带有指针的circular array 来跟踪尾部的位置(您将向队列中添加新项目的位置)。

如果你从这个数组开始:

x[0], x[1], x[2], x[3], x[4], x[5]
                               /\
                              tail

如果您想删除 x[0] 并添加 x[6],您可以使用最初为数组分配的内存来执行此操作,而无需复制

x[6], x[1], x[2], x[3], x[4], x[5]
 /\
tail

等等……

x[6], x[7], x[2], x[3], x[4], x[5]
       /\
      tail

每次入队时,您将尾部向右移动一个位置。您可以使用模数很好地包装:new_tail = (old_tail + 1) % length

找到队列的头部总是在尾部之后的一个位置。这可以使用相同的公式找到:head = (tail + 1) % length

            head
             \/
x[6], x[7], x[2], x[3], x[4], x[5]
       /\
      tail

这是我为这个循环缓冲区/数组创建的类的示例:

# benchmark_circular_buffer.py
import numpy as np

# all operations are O(1) and don't require copying the array
# except to_array which has to copy the array and is O(n)
class RecordingQueue1D:
    def __init__(self, object: object, maxlen: int):
        #allocate the memory we need ahead of time
        self.max_length: int = maxlen
        self.queue_tail: int = maxlen - 1
        o_len = len(object)
        if (o_len == maxlen):
            self.rec_queue = np.array(object, dtype=np.int64)
        elif (o_len > maxlen):
            self.rec_queue = np.array(object[o_len-maxlen:], dtype=np.int64)
        else:
            self.rec_queue = np.append(np.array(object, dtype=np.int64), np.zeros(maxlen-o_len, dtype=np.int64))
            self.queue_tail = o_len - 1

    def to_array(self) -> np.array:
        head = (self.queue_tail + 1) % self.max_length
        return np.roll(self.rec_queue, -head) # this will force a copy

    def enqueue(self, new_data: np.array) -> None:
        # move tail pointer forward then insert at the tail of the queue
        # to enforce max length of recording
        self.queue_tail = (self.queue_tail + 1) % self.max_length        
        self.rec_queue[self.queue_tail] = new_data

    def peek(self) -> int:
        queue_head = (self.queue_tail + 1) % self.max_length
        return self.rec_queue[queue_head]

    def replace_item_at(self, index: int, new_value: int):
        loc = (self.queue_tail + 1 + index) % self.max_length
        self.rec_queue[loc] = new_val

    def item_at(self, index: int) -> int:
        # the item we want will be at head + index
        loc = (self.queue_tail + 1 + index) % self.max_length
        return self.rec_queue[loc]

    def __repr__(self):
        return "tail: " + str(self.queue_tail) + "\narray: " + str(self.rec_queue)

    def __str__(self):
        return "tail: " + str(self.queue_tail) + "\narray: " + str(self.rec_queue)
        # return str(self.to_array())


rnd_arr = np.random.randint(0, 1e6, 10**8)
new_val = -100

slice_arr = rnd_arr.copy()
c_buf_arr = RecordingQueue1D(rnd_arr.copy(), len(rnd_arr))

# Test speed for queuing new a new item
# swapping items 100 and 1000
# swapping items 10000 and 100000
def slice_and_copy():
    slice_arr[:-1] = slice_arr[1:]
    slice_arr[-1] = new_val
    old = slice_arr[100]
    slice_arr[100] = slice_arr[1000]
    old = slice_arr[10000]
    slice_arr[10000] = slice_arr[100000]

def circular_buffer():
    c_buf_arr.enqueue(new_val)
    old = c_buf_arr.item_at(100)
    slice_arr[100] = slice_arr[1000]
    old = slice_arr[10000]
    slice_arr[10000] = slice_arr[100000]

# lets add copying the array to a new numpy.array
# this will take O(N) time for the circular buffer because we use numpy.roll()
# which copies the array.
def slice_and_copy_assignemnt():
    slice_and_copy()
    my_throwaway_arr = slice_arr.copy()
    return my_throwaway_arr

def circular_buffer_assignment():
    circular_buffer()
    my_throwaway_arr = c_buf_arr.to_array().copy()
    return my_throwaway_arr


# test using
# python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy()"
# python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer()" 
# python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
# python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer_assignment()" 

当您必须将大量项目排入队列而不需要交出数组副本时,这比切片快几个数量级。

访问项目和替换项目是 O(1)。 Enqueue 和 peek 都是 O(1)。复制数组需要 O(n) 时间。

基准测试结果:

(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy()"
10 loops, best of 5: 36.7 msec per loop

(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer()" 
200000 loops, best of 5: 1.04 usec per loop

(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
2 loops, best of 5: 166 msec per loop

(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.slice_and_copy_assignemnt()"
4 loops, best of 5: 159 msec per loop

(thermal_venv) PS X:\win10\repos\thermal> python -m timeit -r 5 -n 4 -s "import benchmark_circular_buffer as bcb" "bcb.circular_buffer_assignment()" 
4 loops, best of 5: 511 msec per loop

my GitHub here上有一个测试脚本和一个处理二维数组的实现

【讨论】:

您能否添加一个类似于我之前所做的基准 (***.com/a/42828629/5889533) 进行比较? @Næreen 这是个好主意,我会尝试一下并添加结果。 @Næreen 我对构造函数进行了一些显着的速度改进,并添加了一种方法来替换队列中任何位置的项目。不进行基准测试并直接将其用于生产是愚蠢的。我知道它会更快,但这仍然是我的懒惰,所以我很高兴你给了我推动力。正如我所料,对于将新项目排队,循环缓冲区比切片方法 1 微秒对 36 毫秒快几个数量级。但是,如果您需要复制数组,则开销会显着增加,因此您需要为给定场景选择合适的解决方案。 ? 干得好@L-co。你的模块很有趣!它需要的代码比我上面描述的(几年前)天真的切片和写入方法要多,但它既聪明又高效!

以上是关于左循环 numpy 数组的最快方法(如弹出、推送队列)的主要内容,如果未能解决你的问题,请参考以下文章

Spark 创建 numpy 数组 RDD 的最快方法

将交错的 NumPy 整数数组转换为 complex64 的最快方法是啥?

在numpy中计算超过阈值的数组值的最快方法

使用 NumPy 将 ubyte [0, 255] 数组转换为浮点数组 [-0.5, +0.5] 的最快方法

将包含 Numpy 数组的整个 HDF5 读入内存的最快方法

循环队列