如何不错过 itertools.takewhile() 之后的下一个元素
Posted
技术标签:
【中文标题】如何不错过 itertools.takewhile() 之后的下一个元素【英文标题】:How not to miss the next element after itertools.takewhile() 【发布时间】:2015-06-03 09:11:53 【问题描述】:假设我们希望处理一个迭代器并希望按块处理它。
每个块的逻辑取决于先前计算的块,因此groupby()
没有帮助。
在这种情况下,我们的朋友是 itertools.takewhile():
while True:
chunk = itertools.takewhile(getNewChunkLogic(), myIterator)
process(chunk)
问题是takewhile()
需要越过满足新块逻辑的最后一个元素,从而“吃掉”下一个块的第一个元素。
对此有多种解决方案,包括包装或 à la C 的 ungetc()
等。
我的问题是:有没有优雅的解决方案?
【问题讨论】:
取出 cython 并创建自己的 【参考方案1】:takewhile()
确实需要查看下一个元素以确定何时切换行为。
您可以使用包装器来跟踪最后看到的元素,并且可以“重置”以备份一个元素:
_sentinel = object()
class OneStepBuffered(object):
def __init__(self, it):
self._it = iter(it)
self._last = _sentinel
self._next = _sentinel
def __iter__(self):
return self
def __next__(self):
if self._next is not _sentinel:
next_val, self._next = self._next, _sentinel
return next_val
try:
self._last = next(self._it)
return self._last
except StopIteration:
self._last = self._next = _sentinel
raise
next = __next__ # Python 2 compatibility
def step_back(self):
if self._last is _sentinel:
raise ValueError("Can't back up a step")
self._next, self._last = self._last, _sentinel
在与takewhile()
一起使用之前,将您的迭代器包装在其中:
myIterator = OneStepBuffered(myIterator)
while True:
chunk = itertools.takewhile(getNewChunkLogic(), myIterator)
process(chunk)
myIterator.step_back()
演示:
>>> from itertools import takewhile
>>> test_list = range(10)
>>> iterator = OneStepBuffered(test_list)
>>> list(takewhile(lambda i: i < 5, iterator))
[0, 1, 2, 3, 4]
>>> iterator.step_back()
>>> list(iterator)
[5, 6, 7, 8, 9]
【讨论】:
@KarolyHorvath:如果性能被杀死,这取决于原始迭代器的编码方式。如果其他所有内容都是用 C 编码的,那么是的,这会在 Python 解释器中增加一步,这会影响性能。另一种方法是重新调整您的算法以不依赖takewhile()
。如果没有关于你在做什么的详细信息,我目前无法提供帮助。
@MartijnPieters:是的,这就是我的意思。我什么都没做,OP 是其他人 ;) 至于ValueError
:从您的OneStepBuffered
的角度来看,这是完全合理的,但对于 OP 的任务,这对我来说似乎是一个错误。
好吧,假设这是一个有效的场景......注意:如果是这种情况,您可以捕获异常
假设一个无穷无尽但速度很快的迭代器。【参考方案2】:
鉴于可调用的GetNewChunkLogic()
将在第一个块上报告True
,然后是False
。
以下sn -p
-
解决了
takewhile
的“额外下一步”问题。
很优雅,因为您不必实现后退一步的逻辑。
def partition(pred, iterable):
'Use a predicate to partition entries into true entries and false entries'
# partition(is_odd, range(10)) --> 1 3 5 7 9 and 0 2 4 6 8
t1, t2 = tee(iterable)
return filter(pred, t1), filterfalse(pred, t2)
while True:
head, tail = partition(GetNewChunkLogic(), myIterator)
process(head)
myIterator = tail
但是,最优雅的方法是将您的 GetNewChunkLogic
修改为生成器并删除 while
循环。
【讨论】:
【参考方案3】:这是另一种方法。当谓词失败时产生一个值 (sentinel
),但 before 产生值本身。然后按不是sentinel
的值分组。
这里,group_by_predicate
需要一个返回谓词 (pred_gen
) 的函数。每次谓词失败时都会重新创建:
from itertools import groupby
def group_by_predicate(predicate_gen, _iter):
sentinel = object()
def _group_with_sentinel():
pred = predicate_gen()
for n in _iter:
while not pred(n):
yield sentinel
pred = predicate_gen()
yield n
g = _group_with_sentinel()
for k, g in groupby(g, lambda s: s!=sentinel):
if k:
yield g
然后可以这样使用:
def less_than_gen(maxn):
"""Return a predicate that returns true while the sum of inputs is < maxn"""
def pred(i):
pred.count += i
return pred.count < maxn
pred.count = 0
return pred
data = iter(list(range(9)) * 3)
for g in group_by_predicate(lambda: less_than_gen(15), data):
print(list(g))
输出总和小于15的数字组:
[0, 1, 2, 3, 4]
[5, 6]
[7]
[8, 0, 1, 2, 3]
[4, 5]
[6, 7]
[8, 0, 1, 2, 3]
[4, 5]
[6, 7]
[8]
【讨论】:
【参考方案4】:我遇到了同样的问题。您可能希望使用 itertools.tee
或 itertools.pairwise
(Python 3.10 中的新功能)来处理这个问题,但我认为这些解决方案不是很优雅。
我发现最好的方法就是重写 takewhile。主要基于documentation:
def takewhile_inclusive(predicate, it):
for x in it:
if predicate(x):
yield x
else:
yield x
break
在您的循环中,您可以优雅地使用解包单独处理最终元素:
*chunk,lastPiece = takewhile_inclusive(getNewChunkLogic(), myIterator)
然后你可以链接最后一块:
lastPiece = None
while True:
*chunk,lastPiece = takewhile_inclusive(getNewChunkLogic(), myIterator)
if lastPiece is not None:
myIterator = itertools.chain([lastPiece], myIterator))
【讨论】:
以上是关于如何不错过 itertools.takewhile() 之后的下一个元素的主要内容,如果未能解决你的问题,请参考以下文章