滚动或滑动窗口迭代器?

Posted

技术标签:

【中文标题】滚动或滑动窗口迭代器?【英文标题】:Rolling or sliding window iterator? 【发布时间】:2011-10-12 22:58:25 【问题描述】:

我需要一个可在序列/迭代器/生成器上迭代的滚动窗口(又名滑动窗口)。默认 Python 迭代可以被认为是一种特殊情况,其中窗口长度为 1。我目前正在使用以下代码。有没有人有更 Pythonic、更少冗长或更有效的方法来做到这一点?

def rolling_window(seq, window_size):
    it = iter(seq)
    win = [it.next() for cnt in xrange(window_size)] # First window
    yield win
    for e in it: # Subsequent windows
        win[:-1] = win[1:]
        win[-1] = e
        yield win

if __name__=="__main__":
    for w in rolling_window(xrange(6), 3):
        print w

"""Example output:

   [0, 1, 2]
   [1, 2, 3]
   [2, 3, 4]
   [3, 4, 5]
"""

【问题讨论】:

如果您希望在迭代时对每个窗口执行某种操作(例如 sum()max()),请记住有有效的算法来计算新的恒定时间内每个窗口的值(与窗口大小无关)。我在 Python 库中收集了其中一些算法:rolling。 【参考方案1】:

这似乎是为collections.deque 量身定做的,因为您基本上有一个 FIFO(添加到一端,从另一端移除)。但是,即使您使用list,也不应该切片两次;相反,您可能应该只是列表中的pop(0) 和新项目中的append()

这是一个优化的基于双端队列的实现,仿照您的原始模式:

from collections import deque

def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(n)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        append(e)
        yield win

在我的测试中,它大部分时间都轻松地击败了此处发布的所有其他内容,尽管 pillmuncher 的 tee 版本在大可迭代对象和小窗口方面胜过它。在较大的窗口上,deque 再次以原始速度领先。

访问deque 中的单个项目可能比使用列表或元组更快或更慢。 (如果使用负索引,靠近开头的项目更快,或者靠近结尾的项目。)我在循环体中放了一个sum(w);这发挥了双端队列的优势(从一个项目迭代到下一个项目很快,所以这个循环的运行速度比下一个最快的方法,pilmuncher 的方法快 20%)。当我将其更改为在 10 个窗口中单独查找和添加项目时,表格发生了翻天覆地的变化,tee 方法的速度提高了 20%。通过在最后五个术语中使用负索引,我能够恢复一些速度,但tee 仍然快一点。总体而言,我估计任何一种对于大多数用途来说都足够快,如果您需要更高的性能,请分析并选择效果最好的一种。

【讨论】:

yield win 应该是 yield tuple(win)yield list(win) 以防止返回对同一 deque 对象的引用的迭代器。 我提交了这个to PyPI。使用pip install sliding_window 安装,使用from sliding_window import window 运行。 如果您认为 list(window(range(10))) 应该产生类似 [[0,1],[1,2],[2,3],...] 的内容,您会大吃一惊的。跨度> 显然不会;您需要执行类似list(list(x) for x in window(range(10))) 的操作,否则将其添加到迭代器中。对于某些应用程序来说,这很重要,而对于其他应用程序则不重要,因为我追求速度,所以我选择 not 并让调用者有责任在需要时复制窗口。 如果你在yield之前加回需要的tuple(),这种方法与其他方法相比没有任何优势。【参考方案2】:

旧版本的 Python 文档中有一个 itertools examples:

from itertools import islice

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result
    for elem in it:
        result = result[1:] + (elem,)
        yield result

文档中的那个更简洁一些,并使用itertools 来达到我想象的更大效果。


如果您的迭代器是一个简单的列表/元组,以指定的窗口大小在其中滑动的简单方法是:

seq = [0, 1, 2, 3, 4, 5]
window_size = 3

for i in range(len(seq) - window_size + 1):
    print(seq[i: i + window_size])

输出:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

【讨论】:

不错的答案,但是(我知道你只是在复制链接的配方),我想知道为什么默认窗口大小应该是 2?它应该有一个默认值吗? @TakenMacGuy:我不知道该配方的作者的推理是什么,但我也会选择 2。2 是最小的有用窗口大小(否则你只是在迭代并且不需要窗口),并且需要知道上一个(或下一个)项目也很常见,可以说比任何其他特定的 n 都要多。 有谁知道为什么这个例子被从文档中删除了?是不是有什么问题,或者现在有更简单的选择? 对示例删除感到好奇,发现rhettinger committed on Oct 26, 2003: Replace the window() example with pairwise() which demonstrates tee(). 什么时候进入for elem in it循环?【参考方案3】:

我喜欢tee()

from itertools import tee, izip

def window(iterable, size):
    iters = tee(iterable, size)
    for i in xrange(1, size):
        for each in iters[i:]:
            next(each, None)
    return izip(*iters)

for each in window(xrange(6), 3):
    print list(each)

给予:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

【讨论】:

从我的快速timeit 测试来看,这比 Daniel DePaolo 的慢得多(大约 2:1 的比例),并且感觉并不“更好”。 @David B.:在我的机器上,它只比 Daniel DePaolo 的慢 8%。 @pillmuncher:Python 2.7 还是 3.x?我用的是2.7。该比率对size 的值也相当敏感。如果增加它(例如,如果可迭代对象的长度为 100000 个元素,则将窗口大小设置为 1000),您可能会看到增加。 @David B.:你说的很有道理。在我的代码中,iters 的设置时间为 O(size!),并且多次调用next()(在izip() 中)可能比复制一个元组两次更耗时。我使用的是 Python 2.6.5,顺便说一句。 @pillmuncher:你的意思是,iters的设置时间是 O(size^2),对吧?【参考方案4】:

我使用下面的代码作为一个简单的滑动窗口,它使用生成器来显着提高可读性。以我的经验,它的速度到目前为止足以用于生物信息学序列分析。

我在这里包含它是因为我还没有看到使用这种方法。同样,我没有对其比较性能发表任何声明。

def slidingWindow(sequence,winSize,step=1):
"""Returns a generator that will iterate through
the defined chunks of input sequence. Input sequence
must be sliceable."""

    # Verify the inputs
    if not ((type(winSize) == type(0)) and (type(step) == type(0))):
        raise Exception("**ERROR** type(winSize) and type(step) must be int.")
    if step > winSize:
        raise Exception("**ERROR** step must not be larger than winSize.")
    if winSize > len(sequence):
        raise Exception("**ERROR** winSize must not be larger than sequence length.")

    # Pre-compute number of chunks to emit
    numOfChunks = ((len(sequence)-winSize)/step)+1

    # Do the work
    for i in range(0,numOfChunks*step,step):
        yield sequence[i:i+winSize]

【讨论】:

这里的主要缺点是len(sequence) 调用。如果sequence 是迭代器或生成器,这将不起作用。当输入确实适合内存时,这确实提供了比迭代器更具可读性的解决方案。 是的,你是对的。这种特殊情况最初是为了扫描通常表示为字符串的 DNA 序列。它当然有你提到的限制。如果你愿意,你可以简单地测试每个切片以确保它仍然是正确的长度,然后忘记必须知道整个序列的长度。但这会增加一点开销(每次迭代都进行一次 len() 测试)。【参考方案5】:

只是一个快速的贡献。

由于当前的 python 文档在 itertool 示例中没有“窗口”(即,在http://docs.python.org/library/itertools.html 的底部),这里有一个基于 grouper 的代码,这是给出的示例之一:

import itertools as it
def window(iterable, size):
    shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)]
    return it.izip(*shiftedStarts)

基本上,我们创建了一系列切片迭代器,每个迭代器都有一个起点,向前一点。然后,我们将它们压缩在一起。注意,这个函数返回一个生成器(它本身并不是一个生成器)。

与上面的附加元素和高级迭代器版本非常相似,性能(即最佳)随列表大小和窗口大小而变化。我喜欢这个,因为它是两行的(可以是单行的,但我更喜欢命名概念)。

原来上面的代码是错误的。如果传递给 iterable 的参数是一个序列,但如果它是一个迭代器,它就可以工作。如果它是一个迭代器,则在 islice 调用之间共享(但不是 tee'd)相同的迭代器,这会严重破坏事情。

这是一些固定的代码:

import itertools as it
def window(iterable, size):
    itrs = it.tee(iterable, size)
    shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)]
    return it.izip(*shiftedStarts)

另外,书籍的另一个版本。这个版本不是复制一个迭代器然后多次推进副本,而是在我们向前移动起始位置时对每个迭代器进行成对副本。因此,迭代器 t 既提供了起点为 t 的“完整”迭代器,也提供了创建迭代器 t + 1 的基础:

import itertools as it
def window4(iterable, size):
    complete_itr, incomplete_itr = it.tee(iterable, 2)
    iters = [complete_itr]
    for i in xrange(1, size):
        incomplete_itr.next()
        complete_itr, incomplete_itr = it.tee(incomplete_itr, 2)
        iters.append(complete_itr)
    return it.izip(*iters)

【讨论】:

【参考方案6】:

这是一个概括,增加了对stepfillvalue 参数的支持:

from collections import deque
from itertools import islice

def sliding_window(iterable, size=2, step=1, fillvalue=None):
    if size < 0 or step < 1:
        raise ValueError
    it = iter(iterable)
    q = deque(islice(it, size), maxlen=size)
    if not q:
        return  # empty iterable or size == 0
    q.extend(fillvalue for _ in range(size - len(q)))  # pad to size
    while True:
        yield iter(q)  # iter() to avoid accidental outside modifications
        try:
            q.append(next(it))
        except StopIteration: # Python 3.5 pep 479 support
            return
        q.extend(next(it, fillvalue) for _ in range(step - 1))

它在每次迭代时以块 size 的形式产生项目滚动 step 位置,如有必要,用 fillvalue 填充每个块。 size=4, step=3, fillvalue='*' 的示例:

 [a b c d]e f g h i j k l m n o p q r s t u v w x y z
  a b c[d e f g]h i j k l m n o p q r s t u v w x y z
  a b c d e f[g h i j]k l m n o p q r s t u v w x y z
  a b c d e f g h i[j k l m]n o p q r s t u v w x y z
  a b c d e f g h i j k l[m n o p]q r s t u v w x y z
  a b c d e f g h i j k l m n o[p q r s]t u v w x y z
  a b c d e f g h i j k l m n o p q r[s t u v]w x y z
  a b c d e f g h i j k l m n o p q r s t u[v w x y]z
  a b c d e f g h i j k l m n o p q r s t u v w x[y z * *]

有关step 参数的用例示例,请参阅Processing a large .txt file in python efficiently。

【讨论】:

【参考方案7】:

双端队列窗口的略微修改版本,使其成为真正的滚动窗口。所以它开始只填充一个元素,然后增长到它的最大窗口大小,然后随着它的左边缘接近结束而缩小:

from collections import deque
def window(seq, n=2):
    it = iter(seq)
    win = deque((next(it, None) for _ in xrange(1)), maxlen=n)
    yield win
    append = win.append
    for e in it:
        append(e)
        yield win
    for _ in xrange(len(win)-1):
        win.popleft()
        yield win

for wnd in window(range(5), n=3):
    print(list(wnd))

这给了

[0]
[0, 1]
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4]
[4]

【讨论】:

【参考方案8】:
>>> n, m = 6, 3
>>> k = n - m+1
>>> print ('\n'*(k)).format(*[range(i, i+m) for i in xrange(k)])
[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

【讨论】:

【参考方案9】:

多个迭代器!

def window(seq, size, step=1):
    # initialize iterators
    iters = [iter(seq) for i in range(size)]
    # stagger iterators (without yielding)
    [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)]
    while(True):
        yield [next(i) for i in iters]
        # next line does nothing for step = 1 (skips iterations for step > 1)
        [next(i) for i in iters for j in range(step-1)]

next(it) 在序列完成时引发StopIteration,出于某种我无法理解的酷原因,此处的 yield 语句将其排除在外,函数返回,忽略不形成完整窗口的剩余值。

无论如何,这是行数最少的解决方案,其唯一要求是seq 实现__iter____getitem__,并且不依赖itertoolscollections 除了@dansalmo 的解决方案:)

【讨论】:

注意:交错步骤是 O(n^2),其中 n 是窗口的大小,并且仅在第一次调用时发生。它可以优化到 O(n),但它会使代码有点混乱:P【参考方案10】:
def rolling_window(list, degree):
    for i in range(len(list)-degree+1):
        yield [list[i+o] for o in range(degree)]

将此用于滚动平均函数

【讨论】:

[list[i+o] for o in range(degree)] 等价于list[i:i+degree]【参考方案11】:

为什么不

def pairwise(iterable):
    "s -> (s0,s1), (s1,s2), (s2, s3), ..."
    a, b = tee(iterable)
    next(b, None)
    return zip(a, b)

它记录在 Python doc 中。 您可以轻松地将其扩展到更宽的窗口。

【讨论】:

【参考方案12】:

如何使用以下内容:

mylist = [1, 2, 3, 4, 5, 6, 7]

def sliding_window(l, window_size=2):
    if window_size > len(l):
        raise ValueError("Window size must be smaller or equal to the number of elements in the list.")

    t = []
    for i in xrange(0, window_size):
        t.append(l[i:])

    return zip(*t)

print sliding_window(mylist, 3)

输出:

[(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]

【讨论】:

@keocra zip(*t) 是什么意思?我在哪里可以找到关于这种声明的一些文档? Python 2.7:docs.python.org/2/library/functions.html#zip,星号解包列表并将单个元素作为输入提供给 zip (unpacking arguments)【参考方案13】:
def GetShiftingWindows(thelist, size):
    return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ]

>> a = [1, 2, 3, 4, 5]
>> GetShiftingWindows(a, 3)
[ [1, 2, 3], [2, 3, 4], [3, 4, 5] ]

【讨论】:

你在 Python 中看到“range(len”) 的那一刻,就是代码异味。 @MarkLawrence 是什么让你认为 range(len 在 python 中是一个糟糕的模式?【参考方案14】:

只是为了展示如何组合itertools recipes,我将使用consume 配方将pairwise 配方尽可能直接地扩展回window 配方:

def consume(iterator, n):
    "Advance the iterator n-steps ahead. If n is none, consume entirely."
    # Use functions that consume iterators at C speed.
    if n is None:
        # feed the entire iterator into a zero-length deque
        collections.deque(iterator, maxlen=0)
    else:
        # advance to the empty slice starting at position n
        next(islice(iterator, n, n), None)

def window(iterable, n=2):
    "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..."
    iters = tee(iterable, n)
    # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's
    # slower for larger window sizes, while saving only small fixed "noop" cost
    for i, it in enumerate(iters):
        consume(it, i)
    return zip(*iters)

window 配方与 pairwise 相同,它只是将第二个 tee-ed 迭代器上的单个元素“消耗”替换为逐渐增加的 n - 1 迭代器上的消耗。使用consume 而不是将每个迭代器包装在islice 中会稍微快一些(对于足够大的可迭代对象),因为您只需在consume 阶段支付islice 包装开销,而不是在提取每个窗口值的过程中(所以它以n 为界,而不是iterable 中的项目数)。

在性能方面,与其他一些解决方案相比,这非常好(并且比我测试过的任何其他解决方案都更好)。在 Python 3.5.0、Linux x86-64 上测试,使用 ipython %timeit magic。

kindall's the deque solution,通过使用islice而不是自制生成器表达式来调整性能/正确性并测试结果长度,因此当可迭代对象短于窗口时它不会产生结果,以及通过dequemaxlen 在位置上而不是按关键字(对于较小的输入会产生惊人的差异):

>>> %timeit -r5 deque(windowkindall(range(10), 3), 0)
100000 loops, best of 5: 1.87 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 3), 0)
10000 loops, best of 5: 72.6 μs per loop
>>> %timeit -r5 deque(windowkindall(range(1000), 30), 0)
1000 loops, best of 5: 71.6 μs per loop

与以前改编的 kindall 解决方案相同,但每个 yield win 都更改为 yield tuple(win),因此从生成器存储结果有效,而所有存储的结果都不是最新结果的视图(所有其他合理的解决方案在此都是安全的场景),并将tuple=tuple添加到函数定义中,以将tuple的使用从LEGB中的B移动到L

>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0)
100000 loops, best of 5: 3.05 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0)
10000 loops, best of 5: 207 μs per loop
>>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0)
1000 loops, best of 5: 348 μs per loop

以上基于consume的解决方案:

>>> %timeit -r5 deque(windowconsume(range(10), 3), 0)
100000 loops, best of 5: 3.92 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 3), 0)
10000 loops, best of 5: 42.8 μs per loop
>>> %timeit -r5 deque(windowconsume(range(1000), 30), 0)
1000 loops, best of 5: 232 μs per loop

consume 相同,但内联else 的情况为consume 以避免函数调用和n is None 测试以减少运行时间,特别是对于设置开销是工作的重要部分的小输入:

>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0)
100000 loops, best of 5: 3.57 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0)
10000 loops, best of 5: 40.9 μs per loop
>>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0)
1000 loops, best of 5: 211 μs per loop

(旁注:pairwise 上的一个变体,它使用默认参数 2 重复使用 tee 来生成嵌套的 tee 对象,因此任何给定的迭代器只能推进一次,而不是独立消耗越来越多的次,类似于MrDrFenner's answer 类似于非内联consume 并且在所有测试中都比内联consume 慢,因此为简洁起见,我省略了这些结果。

如您所见,如果您不关心调用者需要存储结果的可能性,我的 kindall 解决方案的优化版本大部分时间都胜出,除了“大可迭代,小窗口大小case"(其中内联 consume 获胜);它随着可迭代大小的增加而迅速降低,而随着窗口大小的增加根本不降低(所有其他解决方案随着可迭代大小的增加而降低得更慢,但也随着窗口大小的增加而降低)。它甚至可以通过包装 map(tuple, ...) 来适应“需要元组”的情况,它的运行速度比将元组放入函数中要慢一些,但它很简单(需要 1-5% 的时间)并让您保持灵活性当您可以容忍重复返回相同的值时运行得更快。

如果您需要对存储退货的安全性,内联 consume 在除最小输入大小之外的所有输入尺寸上都胜出(非内联 consume 稍慢但类似地缩放)。基于deque & tupling 的解决方案仅在最小输入的情况下获胜,因为设置成本较小,且增益较小;随着可迭代的变长,它会严重退化。

作为记录,我使用的yields tuples 的 kindall 解决方案的改编版本是:

def windowkindalltupled(iterable, n=2, tuple=tuple):
    it = iter(iterable)
    win = deque(islice(it, n), n)
    if len(win) < n:
        return
    append = win.append
    yield tuple(win)
    for e in it:
        append(e)
        yield tuple(win)

删除函数定义行中tuple 的缓存并在每个yield 中使用tuple 以获得更快但不太安全的版本。

【讨论】:

显然,这比它可能的效率要低; consume 是通用的(包括执行完整的consume 的能力),因此需要对n is None 进行额外的导入和每次使用测试。在实际代码中,当且仅当我确定性能是一个问题,或者我真的需要更简洁的代码时,我会考虑将 consumeelse 案例内联到 window 中,假设我没有使用consume 用于其他任何事情。但是如果性能没有被证明是一个问题,我会保留单独的定义;命名的consume 函数使操作不那么神奇/自我记录。【参考方案15】:

这是一个老问题,但对于那些仍然感兴趣的人来说,在this 页面(由 Adrian Rosebrock)中使用生成器实现了一个很好的窗口滑块。

它是 OpenCV 的一个实现,但是您可以轻松地将它用于任何其他目的。对于急切的人,我将代码粘贴在这里,但为了更好地理解它,我建议访问原始页面。

def sliding_window(image, stepSize, windowSize):
    # slide a window across the image
    for y in xrange(0, image.shape[0], stepSize):
        for x in xrange(0, image.shape[1], stepSize):
            # yield the current window
            yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]])

提示:您可以在迭代生成器时检查窗口的.shape,丢弃不符合您要求的那些

干杯

【讨论】:

【参考方案16】:

有一个库可以满足您的需要:

import more_itertools
list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3))

Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]

【讨论】:

step=3 实际上应该被删除以匹配 OP 的请求:list(more_itertools.windowed(range(6), 3))【参考方案17】:

修改 DiPaolo's answer 以允许任意填充和可变步长

import itertools
def window(seq, n=2,step=1,fill=None,keep=0):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(itertools.islice(it, n))    
    if len(result) == n:
        yield result
    while True:        
#         for elem in it:        
        elem = tuple( next(it, fill) for _ in range(step))
        result = result[step:] + elem        
        if elem[-1] is fill:
            if keep:
                yield result
            break
        yield result

【讨论】:

【参考方案18】:
#Importing the numpy library
import numpy as np
arr = np.arange(6) #Sequence
window_size = 3
np.lib.stride_tricks.as_strided(arr, shape= (len(arr) - window_size +1, window_size), 
strides = arr.strides*2)

"""Example output:

  [0, 1, 2]
  [1, 2, 3]
  [2, 3, 4]
  [3, 4, 5]

"""

【讨论】:

请写一些关于你的答案的文字。【参考方案19】:

让它变得懒惰!

from itertools import islice, tee

def window(iterable, size): 
    iterators = tee(iterable, size) 
    iterators = [islice(iterator, i, None) for i, iterator in enumerate(iterators)]  
    yield from zip(*iterators)

list(window(range(5), 3))
# [(0, 1, 2), (1, 2, 3), (2, 3, 4)]

【讨论】:

【参考方案20】:

这是一个单行。我对它进行了计时,它与最佳答案的性能相当,并且随着更大的 seq 从 len(seq) = 20 慢 20% 和 len(seq) = 10000 慢 7% 逐渐变得更好

zip(*[seq[i:(len(seq) - n - 1 + i)] for i in range(n)])

【讨论】:

请在您的答案中添加一些解释性文字。并非每个偶然发现此线程的人都是 Python Ninja。 这是关闭 2,这有效: zip(*[seq[i:(len(seq) - n + 1 + i)] for i in range(n)])【参考方案21】:

尝试我的部分,使用 islice 的简单、单行、pythonic 方式。但是,可能不是最有效的。

from itertools import islice
array = range(0, 10)
window_size = 4
map(lambda i: list(islice(array, i, i + window_size)), range(0, len(array) - window_size + 1))
# output = [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]

说明: 使用 window_size 的 islice 创建窗口,并使用 map 遍历所有数组。

【讨论】:

【参考方案22】:

我测试了几个解决方案,我想出了一个,发现我想出的那个是最快的,所以我想我会分享它。

import itertools
import sys

def windowed(l, stride):
    return zip(*[itertools.islice(l, i, sys.maxsize) for i in range(stride)])

【讨论】:

看起来类似于这个答案的第一个解决方案:***.com/a/11249883/7851470 @georgy 我想我跳过了那个答案,因为它是用 Python2 编写的,但我同意,它本质上是一样的!【参考方案23】:

深度学习中滑动窗口数据的优化函数

def SlidingWindow(X, window_length, stride):
    indexer = np.arange(window_length)[None, :] + stride*np.arange(int(len(X)/stride)-window_length+4)[:, None]
    return X.take(indexer)

应用于多维数组

import numpy as np
def SlidingWindow(X, window_length, stride1):
    stride=  X.shape[1]*stride1
    window_length = window_length*X.shape[1]
    indexer = np.arange(window_length)[None, :] + stride1*np.arange(int(len(X)/stride1)-window_length-1)[:, None]
    return X.take(indexer)

【讨论】:

【参考方案24】:

我的两个版本的window 实现

from typing import Sized, Iterable

def window(seq: Sized, n: int, strid: int = 1, drop_last: bool = False):
    for i in range(0, len(seq), strid):
        res = seq[i:i + n]
        if drop_last and len(res) < n:
            break
        yield res


def window2(seq: Iterable, n: int, strid: int = 1, drop_last: bool = False):
    it = iter(seq)
    result = []
    step = 0
    for i, ele in enumerate(it):
        result.append(ele)
        result = result[-n:]
        if len(result) == n:
            if step % strid == 0:
                yield result
            step += 1
    if not drop_last:
        yield result

【讨论】:

【参考方案25】:

另一种从列表生成固定长度窗口的简单方法

from collections import deque

def window(ls,window_size=3):
    window = deque(maxlen=window_size)

    for element in ls:
        
        if len(window)==window_size:
            yield list(window)
        window.append(element)

ls = [0,1,2,3,4,5]

for w in window(ls):
    print(w)

【讨论】:

【参考方案26】:

我最终使用的(保持简单)解决方案:

def sliding_window(items, size):
    return [items[start:end] for start, end
            in zip(range(0, len(items) - size + 1), range(size, len(items) + 1))]

不用说,items 序列需要是可切片的。使用索引并不理想,但考虑到替代方案,它似乎是最不坏的选择......这也可以轻松更改为生成器:只需将 [...] 替换为 (...)

【讨论】:

以上是关于滚动或滑动窗口迭代器?的主要内容,如果未能解决你的问题,请参考以下文章

循环滑动窗口迭代

flink 滚动窗口滑动窗口会话窗口全局窗口

如何实现滑动窗口或减少这些嵌套循环?

11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

11.Flink四大基石Window窗口的分类Flink提供了很多各种场景用的WindowAssigner基于时间的滚动和滑动基于时间的滚动和滑动窗口基于数量的滚动和滑动

Flink 滚动窗口滑动窗口详解