一个有效的迭代器,用于获取列表的前 k 个最小值

Posted

技术标签:

【中文标题】一个有效的迭代器,用于获取列表的前 k 个最小值【英文标题】:an efficient iterator for getting the top k minimum of a list 【发布时间】:2020-07-25 21:02:33 【问题描述】:

我有许多未排序数字的列表,例如:

N=1000000
x = [random.randint(0,N) for i in range(N)]

我只想要前 k 个最小值,目前这是我的方法

def f1(x,k): # O(nlogn)
    return sorted(x)[:k]

这会执行大量冗余操作,因为我们也在对剩余的 N-k 个元素进行排序。枚举也不起作用:

def f2(x,k): # O(nlogn)
    y = []
    for idx,val in enumerate( sorted(x) ):
        if idx == k: break
        y.append(val)
    return y

验证枚举没有帮助:

if 1 : ## Time taken = 0.6364126205444336
    st1 = time.time()
    y = f1(x,3)
    et1 = time.time()
    print('Time taken = ', et1-st1)

if 1 : ## Time taken = 0.6330435276031494
    st2 = time.time()
    y = f2(x,3)
    et2 = time.time()
    print('Time taken = ', et2-st2)

可能我需要一个持续返回列表的下一个最小值的生成器,并且由于获得下一个最小值应该是O(1) 操作,函数f3() 应该只是O(k) 对吗? 在这种情况下,哪个GENERATOR 功能最有效?

def f3(x,k): # O(k)
    y = []
    for idx,val in enumerate( GENERATOR ):
        if idx == k: break
        y.append(val)
    return y

编辑 1

这里显示的分析有误,请忽略并跳转到Edit 3

可能的下限:就时间复杂度而言,我认为这是可实现的下限,但由于它将增加原始列表,因此它是 不是我的问题的解决方案。

def f3(x,k): # O(k) Time
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min) # This removes from the original list
        y.append(curr_min)
        idx += 1
    return y

if 1 : ## Time taken = 0.07096505165100098
    st3 = time.time()
    y = f3(x,3)
    et3 = time.time()
    print('Time taken = ', et3-st3)

O(N) 时间 | O(N) 存储:迄今为止最好的解决方案,但是它需要原始列表的副本,因此导致O(N) 时间和存储,具有一个迭代器,它获得下一个最小值,对于 k 次,将是 O(1) 存储和O(k) 时间。

def f3(x,k): # O(N) Time | O(N) Storage
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y

if 1 : ## Time taken = 0.0814204216003418
    st3 = time.time()
    y = f3(x,3)
    et3 = time.time()
    print('Time taken = ', et3-st3)

编辑 2

感谢您指出我的上述错误,获得列表的最小值应该是O(n),而不是O(1)

编辑 3

这是使用推荐解决方案后的完整分析脚本。现在这引发了更多问题

1) 使用heapq.heappush 将x 构造为堆比使用list.append x 到列表,然后到heapq.heapify 慢吗?

2) 如果 x 已经是堆,heapq.nsmallest 会变慢吗?

3) 当前结论:不要heapq.heapify当前列表,然后使用heapq.nsmallest

import time, random, heapq
import numpy as np

class Timer:
    def __init__(self, description):
        self.description = description
    def __enter__(self):
        self.start = time.perf_counter()
        return self
    def __exit__(self, *args):
        end = time.perf_counter()
        print(f"The time for 'self.description' took: end - self.start.")


def f3(x,k):
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y

def f_sort(x, k):
    y = []
    for idx,val in enumerate( sorted(x) ):
        if idx == k: break
        y.append(val)
    return y

def f_heapify_pop(x, k):
    heapq.heapify(x)
    return [heapq.heappop(x) for _ in range(k)]
def f_heap_pop(x, k):
    return [heapq.heappop(x) for _ in range(k)]

def f_heap_nsmallest(x, k):
    return heapq.nsmallest(k, x)

def f_np_partition(x, k):
    return np.partition(x, k)[:k]

if True : ## Constructing list vs heap
    N=1000000
    # N= 500000
    x_main = [random.randint(0,N) for i in range(N)]
    with Timer('constructing list') as t:
        x=[]
        for curr_val in x_main:
            x.append(curr_val)
    with Timer('constructing heap') as t:
        x_heap=[]
        for curr_val in x_main:
            heapq.heappush(x_heap, curr_val)
    with Timer('heapify x from a list') as t:
        x_heapify=[]
        for curr_val in x_main:
            x_heapify.append(curr_val)
        heapq.heapify(x_heapify)
    with Timer('x list to numpy') as t:
        x_np = np.array(x)
    """
    N=1000000
        The time for 'constructing list' took: 0.2717265225946903.
        The time for 'constructing heap' took: 0.45691753178834915.
        The time for 'heapify x from a list' took: 0.4259336367249489.
        The time for 'x list to numpy' took: 0.14815033599734306. 
    """

if True : ## Performing experiments on list vs heap
    TRIALS = 10
    ## Experiments on x as list : 
    with Timer('f3') as t:
        for _ in range(TRIALS):
            y = f3(x.copy(), 30)
        print(y)
    with Timer('f_sort') as t:
        for _ in range(TRIALS):
            y = f_sort(x.copy(), 30)
        print(y)
    with Timer('f_np_partition on x') as t:
        for _ in range(TRIALS):
            y = f_np_partition(x.copy(), 30)
        print(y)
    ## Experiments on x as list, but converted to heap in place : 
    with Timer('f_heapify_pop on x') as t:
        for _ in range(TRIALS):
            y = f_heapify_pop(x.copy(), 30)
        print(y)
    with Timer('f_heap_nsmallest on x') as t:
        for _ in range(TRIALS):
            y = f_heap_nsmallest(x.copy(), 30)
        print(y)
    ## Experiments on x_heap as heap : 
    with Timer('f_heap_pop on x_heap') as t:
        for _ in range(TRIALS):
            y = f_heap_pop(x_heap.copy(), 30)
        print(y)
    with Timer('f_heap_nsmallest on x_heap') as t:
        for _ in range(TRIALS):
            y = f_heap_nsmallest(x_heap.copy(), 30)
        print(y)
    ## Experiments on x_np as numpy array : 
    with Timer('f_np_partition on x_np') as t:
        for _ in range(TRIALS):
            y = f_np_partition(x_np.copy(), 30)
        print(y)
    # 
    """
    Experiments on x as list : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f3' took: 10.180440502241254.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_sort' took: 9.054768254980445.
        [ 1  5  5  1  0  4  5  6  7  6  7  7 12 12 11 13 11 12 13 18 10 14 10 18 19 19 21 22 24 25]
        The time for 'f_np_partition on x' took: 1.2620676811784506.

    Experiments on x as list, but converted to heap in place : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heapify_pop on x' took: 0.8628390356898308.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_nsmallest on x' took: 0.5187360178679228.

    Experiments on x_heap as heap : 
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_pop on x_heap' took: 0.2054140530526638.
        [0, 1, 1, 4, 5, 5, 5, 6, 6, 7, 7, 7, 10, 10, 11, 11, 12, 12, 12, 13, 13, 14, 18, 18, 19, 19, 21, 22, 24, 25]
        The time for 'f_heap_nsmallest on x_heap' took: 0.6638103127479553.
        [ 1  5  5  1  0  4  5  6  7  6  7  7 12 12 11 13 11 12 13 18 10 14 10 18 19 19 21 22 24 25]
        The time for 'f_np_partition on x_np' took: 0.2107151597738266.
    """

【问题讨论】:

heapq.nsmallest 对于恒定数量的最小项目来说是 O(n) 获得最小值是 O(n),而不是 O(1),顺便说一句。获得最小的排序列表是O(1)。 Methods for k smallest。使用堆是最快的方法之一,对于第 k 个最小的方法是 O(n + k*log(n))。如果不需要排序输出,则显示快速排序为 O(n)。 啊,感谢您的 cmets,确实,获得最小值是 O(n),而不是 O(1)。看来堆确实是要走的路。 你拿走了我的计时码,但由于某种原因正在计时输出的打印。我知道这是一个无关紧要的数量,但没有理由这样做,因为那个时间或多或少独立于产生结果的算法。 【参考方案1】:

这是一个经典问题,普遍接受的解决方案是称为heap 的数据结构。下面我对f3f_heap的每个算法做了10次试验。随着第二个参数k 的值变大,两个性能之间的差异变得更大。对于k = 3,我们的算法f3 耗时0.76 秒,算法f_heap 耗时0.54 秒。但是对于k = 30,这些值分别变为 6.33 秒和 0.54 秒。

import time, random, heapq

class Timer:
    def __init__(self, description):
        self.description = description

    def __enter__(self):
        self.start = time.perf_counter()
        return self

    def __exit__(self, *args):
        end = time.perf_counter()
        print(f"The time for self.description took: end - self.start.")


def f3(x,k): # O(N) Time | O(N) Storage
    y = []
    idx=0
    while idx<k:
        curr_min = min(x)
        x.remove(curr_min)
        y.append(curr_min)
        idx += 1
    return y


def f_heap(x, k): # O(nlogn)
    # if you do not need to retain a heap and just need the k smallest, then:
    #return heapq.nsmallest(k, x)

    heapq.heapify(x)
    return [heapq.heappop(x) for _ in range(k)]



N=1000000
x = [random.randint(0,N) for i in range(N)]

TRIALS = 10

with Timer('f3') as t:
    for _ in range(TRIALS):
        y = f3(x.copy(), 30)
print(y)

print()

with Timer('f_heap') as t:
    for _ in range(TRIALS):
        y = f_heap(x.copy(), 30)
print(y)

打印:

The time for f3 took: 6.3301973.
[0, 1, 1, 7, 9, 11, 11, 13, 13, 14, 17, 18, 18, 18, 19, 20, 20, 21, 23, 24, 25, 25, 26, 27, 28, 28, 29, 30, 30, 31]

The time for f_heap took: 0.5372357999999995.
[0, 1, 1, 7, 9, 11, 11, 13, 13, 14, 17, 18, 18, 18, 19, 20, 20, 21, 23, 24, 25, 25, 26, 27, 28, 28, 29, 30, 30, 31]

A Python Demo

更新

按照@user2357112supportsMonica 的建议,使用numpy.partition 选择k 最小值确实非常快如果您已经在处理numpy 数组。但是,如果您从普通列表开始,并考虑到转换为 numpy 数组的时间只是为了使用 numpy.partition 方法,那么它比使用 hepaq 方法要慢:

def f_np_partition(x, k):
    return sorted(np.partition(x, k)[:k])


with Timer('f_np_partition') as t:
    for _ in range(TRIALS):
        x_np = np.array(x)
        y = f_np_partition(x_np.copy(), 30) # don't really need to copy
print(y)

相对时间:

The time for f3 took: 7.2039111.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]

The time for f_heap took: 0.35521280000000033.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]

The time for f_np_partition took: 0.8379164999999995.
[0, 2, 2, 3, 3, 3, 5, 6, 6, 6, 9, 9, 10, 10, 10, 11, 11, 12, 13, 13, 14, 16, 16, 16, 16, 17, 17, 18, 19, 20]

【讨论】:

从效率的角度思考:我应该如何初始化并附加到x 作为一个堆?而不是作为列表附加到它然后将其转换为堆? 我还更新了我的答案,表明如果您只需要生成列表中的 k 个最小元素,然后不再需要堆,则有 heapq.nsmallest,它甚至更快。 基于堆的选项是 Python 标准库中最方便的选项,但 quickselect 具有更好的渐近复杂度。 @leonardltk1:在 Python 中实现它会引入不切实际的开销,但 NumPy 将其命名为 numpy.partition。使用 NumPy 数组,您可以调用 numpy.partition,然后对您感兴趣的部分进行排序(如果您不关心获得 k 最小值的顺序,则可以跳过排序)。 @Booboo:NumPy 使用 introselect 来避免 O(n^2) 最坏情况下的性能 - 如果它没有取得足够的进展,它会将枢轴选择切换到一个较慢的方法,以保证选择一个好的枢轴。也就是说,NumPy 确实从用 C 实现中获得了很大的好处,特别是在其内部循环中对原始机器数据类型进行操作时。 heapq 的大部分也是用 C 实现的,但 nsmallest 大部分不是,而且 heapq 不能在原始机器整数上运行。

以上是关于一个有效的迭代器,用于获取列表的前 k 个最小值的主要内容,如果未能解决你的问题,请参考以下文章

迭代器生成器

迭代器和无符号整数的重载+运算符

删除 Python 列表中与条件匹配的前 N ​​个项目

Java ListIterator(迭代器)

如何用 Python 实现堆和优先队列?

[uva11997]k个最小和