当可迭代包含数百万个元素时,是不是有 zip(*iterable) 的替代方法?
Posted
技术标签:
【中文标题】当可迭代包含数百万个元素时,是不是有 zip(*iterable) 的替代方法?【英文标题】:Is there an alternative for zip(*iterable) when the iterable consists of millions of elements?当可迭代包含数百万个元素时,是否有 zip(*iterable) 的替代方法? 【发布时间】:2020-12-06 13:10:09 【问题描述】:我遇到过这样的代码:
from random import randint
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
points = [Point(randint(1, 10), randint(1, 10)) for _ in range(10)]
xs = [point.x for point in points]
ys = [point.y for point in points]
而且我认为这段代码不是 Pythonic,因为它会重复自己。如果向Point
类添加另一个维度,则需要编写一个全新的循环,如下所示:
zs = [point.z for point in points]
所以我试着写这样的东西让它更 Pythonic:
xs, ys = zip(*[(point.x, point.y) for point in p])
如果添加新维度,没问题:
xs, ys, zs = zip(*[(point.x, point.y, point.z) for point in p])
但是当有数百万个点时,这几乎比其他解决方案慢 10 倍,尽管它只有一个循环。我认为这是因为 *
运算符需要将数百万个参数解压缩到 zip
函数,这太可怕了。所以我的问题是:
有没有办法改变上面的代码,使其和以前一样快和Pythonic(不使用第三方库)?
【问题讨论】:
首先,您可以使用生成器而不是构建完整列表:zip(*((point.x, point.y, point.z) for point in p))
。与完全使用另一种方法相比,这将有多大帮助,我不能一概而论。
@deceze 我不知道为什么,但它更慢。
@deceze:这根本没有帮助。参数解包总是将其转换为 tuple
,因此只需使用更昂贵的生成器表达式来填充 tuple
,而不是使用更便宜的 listcomp 然后快速浅拷贝。
@ShadowRanger 我明白了,这就解释了,谢谢。
@Tryph 当然它会更快,但我认为那是作弊:) 我可以用 C 编写这段代码,它会快 5 倍。我试图理解为什么它很慢以及如何改进它。
【参考方案1】:
zip(*iter)
的问题在于它会遍历整个可迭代对象并将结果序列作为 args 传递给 zip。
所以这些在功能上是相同的:
使用 *: xs, ys = zip(*[(p.x, p.y) for p in ((0,1),(0,2),(0,3))])
使用定位: xz, ys = zip((0,1),(0,2),(0,3))
。
显然,如果有数百万个位置参数,这会很慢。
迭代器方法是唯一的解决方法。
我在网络上搜索了python itertools unzip
。可悲的是,最接近的itertools
是tee
。在上述要点的链接中,itertools.tee
的迭代器元组从 iunzip
的实现返回:https://gist.github.com/andrix/106334。
我必须把它转换成 python3:
from random import randint
import itertools
import time
from operator import itemgetter
def iunzip(iterable):
"""Iunzip is the same as zip(*iter) but returns iterators, instead of
expand the iterator. Mostly used for large sequence"""
_tmp, iterable = itertools.tee(iterable, 2)
iters = itertools.tee(iterable, len(next(_tmp)))
return (map(itemgetter(i), it) for i, it in enumerate(iters))
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
points = [Point(randint(1, 10), randint(1, 10)) for _ in range(1000000)]
itime = time.time()
xs = [point.x for point in points]
ys = [point.y for point in points]
otime = time.time() - itime
itime += otime
print(f"original: otime")
xs, ys = zip(*[(p.x, p.y) for p in points])
otime = time.time() - itime
itime += otime
print(f"unpacking into zip: otime")
xs, ys = iunzip(((p.x, p.y) for p in points))
for _ in zip(xs, ys): pass
otime = time.time() - itime
itime += otime
print(f"iunzip: otime")
输出:
original: 0.1282501220703125
unpacking into zip: 1.286362886428833
iunzip: 0.3046858310699463
所以迭代器肯定比解包到位置参数更好。更不用说当我达到 1000 万点时,我的 4GB 内存被吃光了……但是,我不相信上面的 iunzip
迭代器是最优化的,因为它是一个内置的 python 考虑到像“原始”方法一样迭代两次以进行解压缩仍然是迄今为止最快的(尝试使用各种长度的点快约 4 倍)。
似乎iunzip
应该是一个东西。我很惊讶它不是 python 内置或 itertools 的一部分......
【讨论】:
【参考方案2】:我刚刚测试了几种压缩Point
坐标的方法,并随着点数的增加寻找它们的性能。
以下是我用来测试的函数:
def hardcode(points):
# a hand crafted comprehension for each coordinate
return [point.x for point in points], [point.y for point in points]
def using_zip(points):
# using the "problematic" qip function
return zip(*((point.x, point.y) for point in points))
def loop_and_comprehension(points):
# making comprehension from a list of coordinate names
zipped = []
for coordinate in ('x', 'y'):
zipped.append([getattr(point, coordinate) for point in points])
return zipped
def nested_comprehension(points):
# making comprehension from a list of coordinate names using nested
# comprehensions
return [
[getattr(point, coordinate) for point in points]
for coordinate in ('x', 'y')
]
使用 timeit 我用不同的点数对每个函数的执行进行计时,结果如下:
comparing processing times using 10 points and 10000000 iterations
hardcode................. 14.12024447 [+0%]
using_zip................ 16.84289724 [+19%]
loop_and_comprehension... 30.83631476 [+118%]
nested_comprehension..... 30.45758349 [+116%]
comparing processing times using 100 points and 1000000 iterations
hardcode................. 9.30594717 [+0%]
using_zip................ 13.74953714 [+48%]
loop_and_comprehension... 19.46766583 [+109%]
nested_comprehension..... 19.27818860 [+107%]
comparing processing times using 1000 points and 100000 iterations
hardcode................. 7.90372457 [+0%]
using_zip................ 12.51523594 [+58%]
loop_and_comprehension... 18.25679913 [+131%]
nested_comprehension..... 18.64352790 [+136%]
comparing processing times using 10000 points and 10000 iterations
hardcode................. 8.27348382 [+0%]
using_zip................ 18.23079485 [+120%]
loop_and_comprehension... 18.00183383 [+118%]
nested_comprehension..... 17.96230063 [+117%]
comparing processing times using 100000 points and 1000 iterations
hardcode................. 9.15848662 [+0%]
using_zip................ 22.70730675 [+148%]
loop_and_comprehension... 17.81126971 [+94%]
nested_comprehension..... 17.86892597 [+95%]
comparing processing times using 1000000 points and 100 iterations
hardcode................. 9.75002857 [+0%]
using_zip................ 23.13891725 [+137%]
loop_and_comprehension... 18.08724660 [+86%]
nested_comprehension..... 18.01269820 [+85%]
comparing processing times using 10000000 points and 10 iterations
hardcode................. 9.96045920 [+0%]
using_zip................ 23.11653558 [+132%]
loop_and_comprehension... 17.98296033 [+81%]
nested_comprehension..... 18.17317708 [+82%]
comparing processing times using 100000000 points and 1 iterations
hardcode................. 64.58698246 [+0%]
using_zip................ 92.53437881 [+43%]
loop_and_comprehension... 73.62493845 [+14%]
nested_comprehension..... 62.99444739 [-2%]
我们可以看到,“硬编码”解决方案与使用 gettattr
构建的理解解决方案之间的差距似乎随着点数的增加而不断缩小。
因此,对于大量的点,使用从坐标列表生成的推导可能是个好主意:
[[getattr(point, coordinate) for point in points]
for coordinate in ('x', 'y')]
但是,对于少数点来说,这是最差的解决方案(至少从我测试的解决方案来看)。
有关信息,这是我用来运行此基准测试的代码:
import timeit
...
def compare(nb_points, nb_iterations):
reference = None
points = [Point(randint(1, 100), randint(1, 100))
for _ in range(nb_points)]
print("comparing processing times using points and iterations"
.format(nb_points, nb_iterations))
for func in (hardcode, using_zip, loop_and_comprehension, nested_comprehension):
duration = timeit.timeit(lambda: func(points), number=nb_iterations)
print(':.<25 :0=2.8f [:0>+.0%]'
.format(func.__name__, duration,
0 if reference is None else (duration / reference - 1)))
if reference is None:
reference = duration
print("-" * 80)
compare(10, 10000000)
compare(100, 1000000)
compare(1000, 100000)
compare(10000, 10000)
compare(100000, 1000)
compare(1000000, 100)
compare(10000000, 10)
compare(100000000, 1)
【讨论】:
以上是关于当可迭代包含数百万个元素时,是不是有 zip(*iterable) 的替代方法?的主要内容,如果未能解决你的问题,请参考以下文章