如何不错过 itertools.takewhile() 之后的下一个元素

Posted

技术标签:

【中文标题】如何不错过 itertools.takewhile() 之后的下一个元素【英文标题】:How not to miss the next element after itertools.takewhile() 【发布时间】:2015-06-03 09:11:53 【问题描述】:

假设我们希望处理一个迭代器并希望按块处理它。 每个块的逻辑取决于先前计算的块,因此groupby() 没有帮助。

在这种情况下,我们的朋友是 itertools.takewhile():

while True:
    chunk = itertools.takewhile(getNewChunkLogic(), myIterator)
    process(chunk)

问题是takewhile() 需要越过满足新块逻辑的最后一个元素,从而“吃掉”下一个块的第一个元素。

对此有多种解决方案,包括包装或 à la C 的 ungetc() 等。 我的问题是:有没有优雅的解决方案?

【问题讨论】:

取出 cython 并创建自己的 【参考方案1】:

takewhile() 确实需要查看下一个元素以确定何时切换行为。

您可以使用包装器来跟踪最后看到的元素,并且可以“重置”以备份一个元素:

_sentinel = object()

class OneStepBuffered(object):
    def __init__(self, it):
        self._it = iter(it)
        self._last = _sentinel
        self._next = _sentinel
    def __iter__(self):
        return self
    def __next__(self):
        if self._next is not _sentinel:
            next_val, self._next = self._next, _sentinel
            return next_val
        try:
            self._last = next(self._it)
            return self._last
        except StopIteration:
            self._last = self._next = _sentinel
            raise
    next = __next__  # Python 2 compatibility
    def step_back(self):
        if self._last is _sentinel:
            raise ValueError("Can't back up a step")
        self._next, self._last = self._last, _sentinel

在与takewhile() 一起使用之前,将您的迭代器包装在其中:

myIterator = OneStepBuffered(myIterator)
while True:
    chunk = itertools.takewhile(getNewChunkLogic(), myIterator)
    process(chunk)
    myIterator.step_back()

演示:

>>> from itertools import takewhile
>>> test_list = range(10)
>>> iterator = OneStepBuffered(test_list)
>>> list(takewhile(lambda i: i < 5, iterator))
[0, 1, 2, 3, 4]
>>> iterator.step_back()
>>> list(iterator)
[5, 6, 7, 8, 9]

【讨论】:

@KarolyHorvath:如果性能被杀死,这取决于原始迭代器的编码方式。如果其他所有内容都是用 C 编码的,那么是的,这会在 Python 解释器中增加一步,这会影响性能。另一种方法是重新调整您的算法以不依赖takewhile()。如果没有关于你在做什么的详细信息,我目前无法提供帮助。 @MartijnPieters:是的,这就是我的意思。我什么都没做,OP 是其他人 ;) 至于ValueError:从您的OneStepBuffered 的角度来看,这是完全合理的,但对于 OP 的任务,这对我来说似乎是一个错误。 好吧,假设这是一个有效的场景......注意:如果是这种情况,您可以捕获异常 假设一个无穷无尽但速度很快的迭代器。【参考方案2】:

鉴于可调用的GetNewChunkLogic() 将在第一个块上报告True,然后是False。 以下sn -p

    解决了takewhile 的“额外下一步”问题。 很优雅,因为您不必实现后退一步的逻辑。

def partition(pred, iterable):
    'Use a predicate to partition entries into true entries and false entries'
    # partition(is_odd, range(10)) -->  1 3 5 7 9 and 0 2 4 6 8
    t1, t2 = tee(iterable)
    return filter(pred, t1), filterfalse(pred, t2)

while True:
    head, tail = partition(GetNewChunkLogic(), myIterator)
    process(head)
    myIterator = tail

但是,最优雅的方法是将您的 GetNewChunkLogic 修改为生成器并删除 while 循环。

【讨论】:

【参考方案3】:

这是另一种方法。当谓词失败时产生一个值 (sentinel),但 before 产生值本身。然后按不是sentinel 的值分组。

这里,group_by_predicate 需要一个返回谓词 (pred_gen) 的函数。每次谓词失败时都会重新创建:

from itertools import groupby


def group_by_predicate(predicate_gen, _iter):
    sentinel = object()
    def _group_with_sentinel():
        pred = predicate_gen()
        for n in _iter:
            while not pred(n):
                yield sentinel
                pred = predicate_gen()
            yield n
            
    g = _group_with_sentinel()
    for k, g in groupby(g, lambda s: s!=sentinel):
        if k:
            yield g

然后可以这样使用:

def less_than_gen(maxn):
    """Return a predicate that returns true while the sum of inputs is < maxn"""
    def pred(i):
        pred.count += i
        return pred.count < maxn
    pred.count = 0
    return pred

data = iter(list(range(9)) * 3)

for g in group_by_predicate(lambda: less_than_gen(15), data):
    print(list(g))

输出总和小于15的数字组:

[0, 1, 2, 3, 4]
[5, 6]
[7]
[8, 0, 1, 2, 3]
[4, 5]
[6, 7]
[8, 0, 1, 2, 3]
[4, 5]
[6, 7]
[8]

【讨论】:

【参考方案4】:

我遇到了同样的问题。您可能希望使用 itertools.teeitertools.pairwise(Python 3.10 中的新功能)来处理这个问题,但我认为这些解决方案不是很优雅。

我发现最好的方法就是重写 takewhile。主要基于documentation:

def takewhile_inclusive(predicate, it):
  for x in it:
    if predicate(x):
      yield x
    else:
      yield x
      break

在您的循环中,您可以优雅地使用解包单独处理最终元素:

*chunk,lastPiece = takewhile_inclusive(getNewChunkLogic(), myIterator)

然后你可以链接最后一块:

lastPiece = None
while True:
  *chunk,lastPiece = takewhile_inclusive(getNewChunkLogic(), myIterator)
  if lastPiece is not None:
    myIterator = itertools.chain([lastPiece], myIterator))
  

【讨论】:

以上是关于如何不错过 itertools.takewhile() 之后的下一个元素的主要内容,如果未能解决你的问题,请参考以下文章

首席信息安全官与首席数据官如何不错过合作机会?

Xcode 错过了“推送到远程”

一个可能的 Resharper 错误还是我只是错过了一些微妙的东西?

理解 Android 元视口缩放:我错过了啥?

行动电缆:如何传递错过的聊天消息

Python中不可错过的五个超有用的神仙级函数