


【中文标题】滚动或滑动窗口迭代器?【英文标题】:Rolling or sliding window iterator?

我需要一个可在序列/迭代器/生成器上迭代的滚动窗口(又名滑动窗口)。默认 Python 迭代可以被认为是一种特殊情况,其中窗口长度为 1。我目前正在使用以下代码。有没有人有更 Pythonic、更少冗长或更有效的方法来做到这一点?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:

   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]


如果您希望在迭代时对每个窗口执行某种操作(例如 sum()max()),请记住有有效的算法来计算新的恒定时间内每个窗口的值(与窗口大小无关)。我在 Python 库中收集了其中一些算法:rolling。 【参考方案1】:

这似乎是为collections.deque 量身定做的,因为您基本上有一个 FIFO(添加到一端,从另一端移除)。但是,即使您使用list,也不应该切片两次;相反,您可能应该只是列表中的pop(0) 和新项目中的append()


from collections import deque

def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(n)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        yield win

在我的测试中,它大部分时间都轻松地击败了此处发布的所有其他内容,尽管 pillmuncher 的 tee 版本在大可迭代对象和小窗口方面胜过它。在较大的窗口上,deque 再次以原始速度领先。

访问deque 中的单个项目可能比使用列表或元组更快或更慢。 (如果使用负索引,靠近开头的项目更快,或者靠近结尾的项目。)我在循环体中放了一个sum(w);这发挥了双端队列的优势(从一个项目迭代到下一个项目很快,所以这个循环的运行速度比下一个最快的方法,pilmuncher 的方法快 20%)。当我将其更改为在 10 个窗口中单独查找和添加项目时,表格发生了翻天覆地的变化,tee 方法的速度提高了 20%。通过在最后五个术语中使用负索引,我能够恢复一些速度,但tee 仍然快一点。总体而言,我估计任何一种对于大多数用途来说都足够快,如果您需要更高的性能,请分析并选择效果最好的一种。


yield win 应该是 yield tuple(win)yield list(win) 以防止返回对同一 deque 对象的引用的迭代器。 我提交了这个to PyPI。使用pip install sliding_window 安装,使用from sliding_window import window 运行。 如果您认为 list(window(range(10))) 应该产生类似 [[0,1],[1,2],[2,3],...] 的内容,您会大吃一惊的。跨度> 显然不会;您需要执行类似list(list(x) for x in window(range(10))) 的操作,否则将其添加到迭代器中。对于某些应用程序来说,这很重要,而对于其他应用程序则不重要,因为我追求速度,所以我选择 not 并让调用者有责任在需要时复制窗口。 如果你在yield之前加回需要的tuple(),这种方法与其他方法相比没有任何优势。【参考方案2】:

旧版本的 Python 文档中有一个 itertools examples:

from itertools import islice

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result

文档中的那个更简洁一些,并使用itertools 来达到我想象的更大效果。


seq = [0, 1, 2, 3, 4, 5]
window_size = 3

for i in range(len(seq) - window_size + 1):
    print(seq[i: i + window_size])


[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]


不错的答案,但是(我知道你只是在复制链接的配方),我想知道为什么默认窗口大小应该是 2?它应该有一个默认值吗? @TakenMacGuy:我不知道该配方的作者的推理是什么,但我也会选择 2。2 是最小的有用窗口大小(否则你只是在迭代并且不需要窗口),并且需要知道上一个(或下一个)项目也很常见,可以说比任何其他特定的 n 都要多。 有谁知道为什么这个例子被从文档中删除了?是不是有什么问题,或者现在有更简单的选择? 对示例删除感到好奇,发现rhettinger committed on Oct 26, 2003: Replace the window() example with pairwise() which demonstrates tee(). 什么时候进入for elem in it循环?【参考方案3】:


from itertools import tee, izip

def window(iterable, size):
    iters = tee(iterable, size)
    for i in xrange(1, size):
        for each in iters[i:]:
            next(each, None)
    return izip(*iters)

for each in window(xrange(6), 3):
    print list(each)


[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]


从我的快速timeit 测试来看,这比 Daniel DePaolo 的慢得多(大约 2:1 的比例),并且感觉并不“更好”。 @David B.:在我的机器上,它只比 Daniel DePaolo 的慢 8%。 @pillmuncher:Python 2.7 还是 3.x?我用的是2.7。该比率对size 的值也相当敏感。如果增加它(例如,如果可迭代对象的长度为 100000 个元素,则将窗口大小设置为 1000),您可能会看到增加。 @David B.:你说的很有道理。在我的代码中,iters 的设置时间为 O(size!),并且多次调用next()(在izip() 中)可能比复制一个元组两次更耗时。我使用的是 Python 2.6.5,顺便说一句。 @pillmuncher:你的意思是,iters的设置时间是 O(size^2),对吧?【参考方案4】:



def slidingWindow(sequence,winSize,step=1):
"""Returns a generator that will iterate through
the defined chunks of input sequence. Input sequence
must be sliceable."""

    # Verify the inputs
    if not ((type(winSize) == type(0)) and (type(step) == type(0))):
        raise Exception("**ERROR** type(winSize) and type(step) must be int.")
    if step > winSize:
        raise Exception("**ERROR** step must not be larger than winSize.")
    if winSize > len(sequence):
        raise Exception("**ERROR** winSize must not be larger than sequence length.")

    # Pre-compute number of chunks to emit
    numOfChunks = ((len(sequence)-winSize)/step)+1

    # Do the work
    for i in range(0,numOfChunks*step,step):
        yield sequence[i:i+winSize]


这里的主要缺点是len(sequence) 调用。如果sequence 是迭代器或生成器,这将不起作用。当输入确实适合内存时,这确实提供了比迭代器更具可读性的解决方案。 是的,你是对的。这种特殊情况最初是为了扫描通常表示为字符串的 DNA 序列。它当然有你提到的限制。如果你愿意,你可以简单地测试每个切片以确保它仍然是正确的长度,然后忘记必须知道整个序列的长度。但这会增加一点开销(每次迭代都进行一次 len() 测试)。【参考方案5】:


由于当前的 python 文档在 itertool 示例中没有“窗口”(即,在http://docs.python.org/library/itertools.html 的底部),这里有一个基于 grouper 的代码,这是给出的示例之一:

import itertools as it
def window(iterable, size):
    shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)]
    return it.izip(*shiftedStarts)



原来上面的代码是错误的。如果传递给 iterable 的参数是一个序列,但如果它是一个迭代器,它就可以工作。如果它是一个迭代器,则在 islice 调用之间共享(但不是 tee'd)相同的迭代器,这会严重破坏事情。


import itertools as it
def window(iterable, size):
    itrs = it.tee(iterable, size)
    shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)]
    return it.izip(*shiftedStarts)

另外,书籍的另一个版本。这个版本不是复制一个迭代器然后多次推进副本,而是在我们向前移动起始位置时对每个迭代器进行成对副本。因此,迭代器 t 既提供了起点为 t 的“完整”迭代器,也提供了创建迭代器 t + 1 的基础:

import itertools as it
def window4(iterable, size):
    complete_itr, incomplete_itr = it.tee(iterable, 2)
    iters = [complete_itr]
    for i in xrange(1, size):
        complete_itr, incomplete_itr = it.tee(incomplete_itr, 2)
    return it.izip(*iters)



这是一个概括,增加了对stepfillvalue 参数的支持:

from collections import deque
from itertools import islice

def sliding_window(iterable, size=2, step=1, fillvalue=None):
    if size < 0 or step < 1:
        raise ValueError
    it = iter(iterable)
    q = deque(islice(it, size), maxlen=size)
    if not q:
        return  # empty iterable or size == 0
    q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
    while True:
        yield iter(q)  # iter() to avoid accidental outside modifications
        except StopIteration: # Python 3.5 pep 479 support
        q.extend(next(it, fillvalue) for _ in range(step - 1))

它在每次迭代时以块 size 的形式产生项目滚动 step 位置,如有必要,用 fillvalue 填充每个块。 size=4, step=3, fillvalue='*' 的示例:

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
  a b c[d e f g]h i j k l m n o p q r s t u v w x y z
  a b c d e f[g h i j]k l m n o p q r s t u v w x y z
  a b c d e f g h i[j k l m]n o p q r s t u v w x y z
  a b c d e f g h i j k l[m n o p]q r s t u v w x y z
  a b c d e f g h i j k l m n o[p q r s]t u v w x y z
  a b c d e f g h i j k l m n o p q r[s t u v]w x y z
  a b c d e f g h i j k l m n o p q r s t u[v w x y]z
  a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step 参数的用例示例,请参阅Processing a large .txt file in python efficiently。




from collections import deque
def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(1)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        yield win
    for _ in xrange(len(win)-1):
        yield win

for wnd in window(range(5), n=3):


[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]


>>> n, m = 6, 3
>>> k = n - m+1
>>> print ('\n'*(k)).format(*[range(i, i+m) for i in xrange(k)])
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]




def window(seq, size, step=1):
    # initialize iterators
    iters = [iter(seq) for i in range(size)]
    # stagger iterators (without yielding)
    [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
        yield [next(i) for i in iters]
        # next line does nothing for step = 1 (skips iterations for step > 1)
        [next(i) for i in iters for j in range(step-1)]

next(it) 在序列完成时引发StopIteration,出于某种我无法理解的酷原因,此处的 yield 语句将其排除在外,函数返回,忽略不形成完整窗口的剩余值。

无论如何,这是行数最少的解决方案,其唯一要求是seq 实现__iter____getitem__,并且不依赖itertoolscollections 除了@dansalmo 的解决方案:)


注意:交错步骤是 O(n^2),其中 n 是窗口的大小,并且仅在第一次调用时发生。它可以优化到 O(n),但它会使代码有点混乱:P【参考方案10】:
def rolling_window(list, degree):
    for i in range(len(list)-degree+1):
        yield [list[i+o] for o in range(degree)]



[list[i+o] for o in range(degree)] 等价于list[i:i+degree]【参考方案11】:


def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

它记录在 Python doc 中。 您可以轻松地将其扩展到更宽的窗口。




mylist = [1, 2, 3, 4, 5, 6, 7]

def sliding_window(l, window_size=2):
    if window_size > len(l):
        raise ValueError("Window size must be smaller or equal to the number of elements in the list.")

    t = []
    for i in xrange(0, window_size):

    return zip(*t)

print sliding_window(mylist, 3)


[(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]


@keocra zip(*t) 是什么意思?我在哪里可以找到关于这种声明的一些文档? Python 2.7:docs.python.org/2/library/functions.html#zip,星号解包列表并将单个元素作为输入提供给 zip (unpacking arguments)【参考方案13】:
def GetShiftingWindows(thelist, size):
    return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ]

>> a = [1, 2, 3, 4, 5]
>> GetShiftingWindows(a, 3)
[ [1, 2, 3], [2, 3, 4], [3, 4, 5] ]


你在 Python 中看到“range(len”) 的那一刻,就是代码异味。 @MarkLawrence 是什么让你认为 range(len 在 python 中是一个糟糕的模式?【参考方案14】:

只是为了展示如何组合itertools recipes,我将使用consume 配方将pairwise 配方尽可能直接地扩展回window 配方:

def consume(iterator, n):
    "Advance the iterator n-steps ahead. If n is none, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

def window(iterable, n=2):
    "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..."
    iters = tee(iterable, n)
    # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's
    # slower for larger window sizes, while saving only small fixed "noop" cost
    for i, it in enumerate(iters):
        consume(it, i)
    return zip(*iters)

window 配方与 pairwise 相同,它只是将第二个 tee-ed 迭代器上的单个元素“消耗”替换为逐渐增加的 n - 1 迭代器上的消耗。使用consume 而不是将每个迭代器包装在islice 中会稍微快一些(对于足够大的可迭代对象),因为您只需在consume 阶段支付islice 包装开销,而不是在提取每个窗口值的过程中(所以它以n 为界,而不是iterable 中的项目数)。

在性能方面,与其他一些解决方案相比,这非常好(并且比我测试过的任何其他解决方案都更好)。在 Python 3.5.0、Linux x86-64 上测试,使用 ipython %timeit magic。

kindall's the deque solution,通过使用islice而不是自制生成器表达式来调整性能/正确性并测试结果长度,因此当可迭代对象短于窗口时它不会产生结果,以及通过dequemaxlen 在位置上而不是按关键字(对于较小的输入会产生惊人的差异):

>>> %timeit -r5 deque(windowkindall(range(10), 3), 0)
100000 loops, best of 5: 1.87 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 3), 0)
10000 loops, best of 5: 72.6 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 30), 0)
1000 loops, best of 5: 71.6 μs per loop

与以前改编的 kindall 解决方案相同,但每个 yield win 都更改为 yield tuple(win),因此从生成器存储结果有效,而所有存储的结果都不是最新结果的视图(所有其他合理的解决方案在此都是安全的场景),并将tuple=tuple添加到函数定义中,以将tuple的使用从LEGB中的B移动到L

>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0)
100000 loops, best of 5: 3.05 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0)
10000 loops, best of 5: 207 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0)
1000 loops, best of 5: 348 μs per loop


>>> %timeit -r5 deque(windowconsume(range(10), 3), 0)
100000 loops, best of 5: 3.92 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 3), 0)
10000 loops, best of 5: 42.8 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 30), 0)
1000 loops, best of 5: 232 μs per loop

consume 相同,但内联else 的情况为consume 以避免函数调用和n is None 测试以减少运行时间,特别是对于设置开销是工作的重要部分的小输入:

>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0)
100000 loops, best of 5: 3.57 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0)
10000 loops, best of 5: 40.9 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0)
1000 loops, best of 5: 211 μs per loop

(旁注:pairwise 上的一个变体,它使用默认参数 2 重复使用 tee 来生成嵌套的 tee 对象,因此任何给定的迭代器只能推进一次,而不是独立消耗越来越多的次,类似于MrDrFenner's answer 类似于非内联consume 并且在所有测试中都比内联consume 慢,因此为简洁起见,我省略了这些结果。

如您所见,如果您不关心调用者需要存储结果的可能性,我的 kindall 解决方案的优化版本大部分时间都胜出,除了“大可迭代,小窗口大小case"(其中内联 consume 获胜);它随着可迭代大小的增加而迅速降低,而随着窗口大小的增加根本不降低(所有其他解决方案随着可迭代大小的增加而降低得更慢,但也随着窗口大小的增加而降低)。它甚至可以通过包装 map(tuple, ...) 来适应“需要元组”的情况,它的运行速度比将元组放入函数中要慢一些,但它很简单(需要 1-5% 的时间)并让您保持灵活性当您可以容忍重复返回相同的值时运行得更快。

如果您需要对存储退货的安全性,内联 consume 在除最小输入大小之外的所有输入尺寸上都胜出(非内联 consume 稍慢但类似地缩放)。基于deque & tupling 的解决方案仅在最小输入的情况下获胜,因为设置成本较小,且增益较小;随着可迭代的变长,它会严重退化。

作为记录,我使用的yields tuples 的 kindall 解决方案的改编版本是:

def windowkindalltupled(iterable, n=2, tuple=tuple):
    it = iter(iterable)
    win = deque(islice(it, n), n)
    if len(win) < n:
    append = win.append
    yield tuple(win)
    for e in it:
        yield tuple(win)

删除函数定义行中tuple 的缓存并在每个yield 中使用tuple 以获得更快但不太安全的版本。


显然,这比它可能的效率要低; consume 是通用的(包括执行完整的consume 的能力),因此需要对n is None 进行额外的导入和每次使用测试。在实际代码中,当且仅当我确定性能是一个问题,或者我真的需要更简洁的代码时,我会考虑将 consumeelse 案例内联到 window 中,假设我没有使用consume 用于其他任何事情。但是如果性能没有被证明是一个问题,我会保留单独的定义;命名的consume 函数使操作不那么神奇/自我记录。【参考方案15】:

这是一个老问题,但对于那些仍然感兴趣的人来说,在this 页面(由 Adrian Rosebrock)中使用生成器实现了一个很好的窗口滑块。

它是 OpenCV 的一个实现,但是您可以轻松地将它用于任何其他目的。对于急切的人,我将代码粘贴在这里,但为了更好地理解它,我建议访问原始页面。

def sliding_window(image, stepSize, windowSize):
    # slide a window across the image
    for y in xrange(0, image.shape[0], stepSize):
        for x in xrange(0, image.shape[1], stepSize):
            # yield the current window
            yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]])






import more_itertools
list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3))

Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]


step=3 实际上应该被删除以匹配 OP 的请求:list(more_itertools.windowed(range(6), 3))【参考方案17】:

修改 DiPaolo's answer 以允许任意填充和可变步长

import itertools
def window(seq, n=2,step=1,fill=None,keep=0):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(itertools.islice(it, n))    
    if len(result) == n:
        yield result
    while True:        
#         for elem in it:        
        elem = tuple( next(it, fill) for _ in range(step))
        result = result[step:] + elem        
        if elem[-1] is fill:
            if keep:
                yield result
        yield result


#Importing the numpy library
import numpy as np
arr = np.arange(6) #Sequence
window_size = 3
np.lib.stride_tricks.as_strided(arr, shape= (len(arr) - window_size +1, window_size), 
strides = arr.strides*2)

"""Example output:

  [0, 1, 2]
  [1, 2, 3]
  [2, 3, 4]
  [3, 4, 5]





from itertools import islice, tee

def window(iterable, size): 
    iterators = tee(iterable, size) 
    iterators = [islice(iterator, i, None) for i, iterator in enumerate(iterators)]  
    yield from zip(*iterators)

list(window(range(5), 3))
# [(0, 1, 2), (1, 2, 3), (2, 3, 4)]



这是一个单行。我对它进行了计时,它与最佳答案的性能相当,并且随着更大的 seq 从 len(seq) = 20 慢 20% 和 len(seq) = 10000 慢 7% 逐渐变得更好

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])


请在您的答案中添加一些解释性文字。并非每个偶然发现此线程的人都是 Python Ninja。 这是关闭 2,这有效: zip(*[seq[i:(len(seq) - n + 1 + i)] for i in range(n)])【参考方案21】:

尝试我的部分,使用 islice 的简单、单行、pythonic 方式。但是,可能不是最有效的。

from itertools import islice
array = range(0, 10)
window_size = 4
map(lambda i: list(islice(array, i, i + window_size)), range(0, len(array) - window_size + 1))
# output = [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]

说明: 使用 window_size 的 islice 创建窗口,并使用 map 遍历所有数组。




import itertools
import sys

def windowed(l, stride):
    return zip(*[itertools.islice(l, i, sys.maxsize) for i in range(stride)])


看起来类似于这个答案的第一个解决方案:***.com/a/11249883/7851470 @georgy 我想我跳过了那个答案,因为它是用 Python2 编写的,但我同意,它本质上是一样的!【参考方案23】:


def SlidingWindow(X, window_length, stride):
    indexer = np.arange(window_length)[None, :] + stride*np.arange(int(len(X)/stride)-window_length+4)[:, None]
    return X.take(indexer)


import numpy as np
def SlidingWindow(X, window_length, stride1):
    stride=  X.shape[1]*stride1
    window_length = window_length*X.shape[1]
    indexer = np.arange(window_length)[None, :] + stride1*np.arange(int(len(X)/stride1)-window_length-1)[:, None]
    return X.take(indexer)



我的两个版本的window 实现

from typing import Sized, Iterable

def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
    for i in range(0, len(seq), strid):
        res = seq[i:i + n]
        if drop_last and len(res) < n:
        yield res

def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
    it = iter(seq)
    result = []
    step = 0
    for i, ele in enumerate(it):
        result = result[-n:]
        if len(result) == n:
            if step % strid == 0:
                yield result
            step += 1
    if not drop_last:
        yield result




from collections import deque

def window(ls,window_size=3):
    window = deque(maxlen=window_size)

    for element in ls:
        if len(window)==window_size:
            yield list(window)

ls = [0,1,2,3,4,5]

for w in window(ls):




def sliding_window(items, size):
    return [items[start:end] for start, end
            in zip(range(0, len(items) - size + 1), range(size, len(items) + 1))]

不用说,items 序列需要是可切片的。使用索引并不理想,但考虑到替代方案,它似乎是最不坏的选择......这也可以轻松更改为生成器:只需将 [...] 替换为 (...)




