(节省内存)将`sorted`作为生成器来实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(节省内存)将`sorted`作为生成器来实现相关的知识,希望对你有一定的参考价值。
我想这不是一个特别新的话题,我想有比我的更好的实现。我正在寻找(a)我正在处理的算法类型--它的实际名称或类似的名称--以及(b)潜在的更好的实现。
一般问题:想象一个列表 a
,它非常长--太长了,无法多次放入内存。列表中包含了一个 "随机 "的事物序列,这些事物允许被排序(<
, >
和 ==
工作)。) 我想按升序迭代列表中的所有条目,包括重复的条目,但不复制列表,也不产生任何类似 "极端 "长度的条目。我还想保持列表中条目的原始顺序。a
即原地 sort
被排除在外。所以我基本上希望在不修改原始数据源的情况下,尽量减少排序所需的内存。
Python的sorted并不接触原始数据,而是生成一个新的列表,与原始数据具有相同的大小长度。因此,我的基本想法是重新实施 sorted
作为发电机。
def sorted_nocopy_generator(data_list):
state_max = max(data_list)
state = min(data_list)
state_count = data_list.count(state)
for _ in range(state_count):
yield state
index = state_count
while index < len(data_list):
new_state = state_max
for entry in data_list:
if state < entry < new_state:
new_state = entry
state = new_state
state_count = data_list.count(state)
for _ in range(state_count):
yield state
index += state_count
它可以被测试如下。
import random
a_min = 0
a_max = 10000
a = list(range(a_min, a_max)) # test data
a.extend((random.randint(a_min, a_max - 1) for _ in range(len(a) // 10))) # some double entries
random.shuffle(a) # "random" order
a_control = a.copy() # for verification that a is not altered
a_test_sorted_nocopy_generator = list(sorted_nocopy_generator(a))
assert a == a_control
a_test_sorted = sorted(a)
assert a == a_control
assert a_test_sorted == a_test_sorted_nocopy_generator
它的尺度是O(N^2), 就像bubblesort那样. 我在寻找什么样的算法?如何优化这个东西(可能是通过买卖 一些 内存)?)
在这里画个草图。 哪儿 N = len(data_list)
和 S = sqrt(N)
使用 O(S)
额外的内存,并采取最坏的情况 O(N*log(N))
时间。
每一个连续的片长
S
中的原始数据,将该片复制到一个临时列表中,使用.sort()
来对其进行排序,然后将结果写入一个唯一的临时文件。 将会有大约S
所有的临时文件。将这些临时文件送入
heapq.merge()
. 那是一个发电机,只追踪到了S
当前最小值S
输入,所以这部分也有O(S)
内存负担。删除临时文件。
内存越多,所需的临时文件越少,速度也就越快。
削减常数因子
正如评论中指出的,次二次元时间算法的希望不大。 然而,你可以通过削减数据的传递次数来削减原算法中的常数因子。 这是一个方法,产生下一个 K
每次通过数据时的条目。 不过总体上仍然是二次元时间。
def sorted_nocopy_generator(data_list, K=100):
import itertools
from bisect import insort
assert K >= 1
total = 0
too_small = None
while total < len(data_list):
active = [] # hold the next K entries
entry2count = {}
for entry in data_list:
if entry in entry2count:
entry2count[entry] += 1
elif ((too_small is None or too_small < entry) and
(len(active) < K or entry < active[-1])):
insort(active, entry)
entry2count[entry] = 1
if len(active) > K: # forget the largest
del entry2count[active.pop()]
for entry in active:
count = entry2count[entry]
yield from itertools.repeat(entry, count)
total += count
too_small = active[-1]
而消除最坏的情况
就像 @btilly 的回答一样,上面代码中最糟糕的情况可以通过使用 max heap 来规避。 然后在上面的代码中添加一个新的条目到 active
有最坏情况下的时间 O(log(K))
而不是 O(K)
.
幸运的是... heapq
模块已经为这个目的提供了一些可用的东西。 heapq
的用途并没有暴露出来。
所以下面的特殊情况下,最大的最小的 K
感兴趣的条目,使用 .count()
(就像你原来的程序一样)做一个完整的通道来计算有多少个。
但不需要对 每一 独一无二的元素,它只需要在每一个 K
元素。
额外的内存使用量与 K
.
def sorted_nocopy_generator(data_list, K=100):
import itertools
from heapq import nsmallest
assert K >= 1
too_small = None
ntodo = len(data_list)
while ntodo:
if too_small is None:
active = nsmallest(K, data_list)
else:
active = nsmallest(K, (x for x in data_list
if x > too_small))
too_small = active[-1]
for x in active:
if x == too_small:
break
yield x
ntodo -= 1
count = data_list.count(too_small)
yield from itertools.repeat(too_small, count)
ntodo -= count
assert ntodo >= 0
假设你有以下的记忆 k
的东西在一个单独的列表中。 然后下面的比例会像 O((n + n^2/k) log(k))
. 在极端情况下 k
常数,它是二次型的,如果 k
是一个固定的分数 n
那就是 O(n log(n))
.
这个想法是建立一个缓冲区的。k
最小的东西,你还没有返回。 把它变成一个迷你堆,然后用堆操作及时从它那里返回东西。O(log(k))
每返回一个元素。 当缓冲区为空时,重新填满缓冲区,然后再进行之前的操作。 缓冲区开始时是空的)。
为了创建缓冲区,我们扫描数组,放入比最后返回的元素大的元素,直到达到 k
缓冲区中的东西。 然后把它变成一个最大堆,当堆中的东西大于返回的东西,小于已有的东西时,就把它替换掉。 当它被重新填满时,将它重新组织成一个min-heap。
你将需要重新填充缓冲区 n/k
次。 每一次都是 O(n log(k))
操作最坏情况。 (如果 k << n
而数组是随机顺序的,那么平均运行时间是 O(n)
因为几乎所有的东西都会与堆中的最大值进行比较,然后丢弃)。)
重要的边缘情况。 如果你有重复的东西,你需要考虑到你的缓冲区只包含了一些最大的东西的副本的情况。 一种方法是跟踪最大的东西有多少副本应该在堆中,但实际上并没有存储在那里。 这样在你清空堆后,你就可以把所有缺失的副本也返回来,然后继续扫描更大的元素。
以上是关于(节省内存)将`sorted`作为生成器来实现的主要内容,如果未能解决你的问题,请参考以下文章