在 numpy 数组中查找最大 N 个元素的快速方法

Posted

技术标签:

【中文标题】在 numpy 数组中查找最大 N 个元素的快速方法【英文标题】:A fast way to find the largest N elements in an numpy array 【发布时间】:2012-05-07 10:17:06 【问题描述】:

我知道我可以这样做:

import numpy as np
N=10
a=np.arange(1,100,1)
np.argsort()[-N:]

但是,由于它进行了完整排序,因此速度非常慢。

我想知道 numpy 是否提供了一些快速完成的方法。

【问题讨论】:

How to get indices of N maximum values in a numpy array?的可能重复 【参考方案1】:

numpy 1.8 实现了执行部分排序的 partitionargpartition(在 O(n) 时间内,而不是 O(n) * log(n) 的完整排序)。

import numpy as np

test = np.array([9,1,3,4,8,7,2,5,6,0])

temp = np.argpartition(-test, 4)
result_args = temp[:4]

temp = np.partition(-test, 4)
result = -temp[:4]

结果:

>>> result_args
array([0, 4, 8, 5]) # indices of highest vals
>>> result
array([9, 8, 6, 7]) # highest vals

时间:

In [16]: a = np.arange(10000)

In [17]: np.random.shuffle(a)

In [18]: %timeit np.argsort(a)
1000 loops, best of 3: 1.02 ms per loop

In [19]: %timeit np.argpartition(a, 100)
10000 loops, best of 3: 139 us per loop

In [20]: %timeit np.argpartition(a, 1000)
10000 loops, best of 3: 141 us per loop

【讨论】:

注意可能对其他人有帮助:示例不是最佳选择,因为不能保证结果是有序的 @user3080953。我从不说保证结果是有序的,这就是部分排序。在我提供的示例中:[9, 8, 6, 7] 很明显,n 个最高值不是按顺序排列的。 是的,事后看来,这很明显,因为您无法在 O(n) 中进行排序。我花了 20 分钟寻找错误,并认为这可能对其他人阅读本文有所帮助 @user3080953。尝试将“kth”设置为序列,如 numpy.argpartition 的文档中所述——“如果提供了第 k 个序列,它将立即将它们全部划分到它们的排序位置。”而且,文档后面的示例 -- >>> x = np.array([3, 4, 2, 1]) >>> x[np.argpartition(x, 3)] array([2, 1, 3 , 4]) >>> x[np.argpartition(x, (1, 3))] 数组([1, 2, 3, 4]) docs.scipy.org/doc/numpy/reference/generated/… @dennlinger 'inverted',你指的是-test吗? np.partition 默认按升序排序。为了按降序排序,我们可以将所有数字变为负数(array([-9, -1, -3, -4, -8, -7, -2, -5, -6, 0]))并在该数组中排序。【参考方案2】:

bottleneck 模块有一个快速的部分排序方法,可以直接使用 Numpy 数组:bottleneck.partition()

注意bottleneck.partition() 返回排序后的实际值,如果您想要排序值的索引(numpy.argsort() 返回的内容),您应该使用bottleneck.argpartition()

我已经进行了基准测试:

z = -bottleneck.partition(-a, 10)[:10] z = a.argsort()[-10:] z = heapq.nlargest(10, a)

其中a 是一个随机的 1,000,000 元素数组。

时间安排如下:

bottleneck.partition():每个循环 25.6 毫秒 np.argsort():每个循环 198 毫秒 heapq.nlargest():每个循环 358 毫秒

【讨论】:

@Mike Graham:感谢您的编辑,但nanargmax() 所做的事情与 OP 的要求完全不同。我要回滚编辑。如果我遗漏了什么,请纠正我。 可能瓶颈更快,但由于EPD7.1中没有提供,我们可能不会使用它。 @HailiangZhang:我也很想看到bottleneck 加入环保署。 郑重声明,bottleneck.partsort()np.argsort() 正在做两件略有不同的事情。它们分别返回一个值和一个索引。如果您希望瓶颈返回索引,请使用bottleneck.argpartsort heapq.nlargest 的时间安排不太公平。最好运行heapq.nlargest(10, a.tolist())【参考方案3】:

我遇到了这个问题,因为这个问题已经 5 年了,我不得不重做所有基准测试并更改瓶颈的语法(不再有 partsort,现在是 partition)。

我使用与 kwgoodman 相同的参数,除了检索的元素数量,我增加到 50(以更好地适应我的特定情况)。

我得到了这些结果:

bottleneck 1: 01.12 ms per loop
bottleneck 2: 00.95 ms per loop
pandas      : 01.65 ms per loop
heapq       : 08.61 ms per loop
numpy       : 12.37 ms per loop
numpy 2     : 00.95 ms per loop

因此,bottleneck_2 和 numpy_2(adas 的解决方案)并列。 但是,使用 np.percentile (numpy_2) 您已经对那些 topN 元素进行了排序,而其他解决方案则并非如此。另一方面,如果您也对这些元素的索引感兴趣,则百分位数没有用处。

我也添加了 pandas,它在下面使用了瓶颈,如果有的话 (http://pandas.pydata.org/pandas-docs/stable/install.html#recommended-dependencies)。如果您已经有一个 pandas Series 或 DataFrame 可以开始,那么您就掌握得很好,只需使用 nlargest 就可以了。

用于基准测试的代码如下(请使用python 3):

import time
import numpy as np
import bottleneck as bn
import pandas as pd
import heapq

def bottleneck_1(a, n):
    return -bn.partition(-a, n)[:n]

def bottleneck_2(a, n):
    return bn.partition(a, a.size-n)[-n:]

def numpy(a, n):
    return a[a.argsort()[-n:]]

def numpy_2(a, n):
    M = a.shape[0]
    perc = (np.arange(M-n,M)+1.0)/M*100
    return np.percentile(a,perc)

def pandas(a, n):
    return pd.Series(a).nlargest(n)

def hpq(a, n):
    return heapq.nlargest(n, a)

def do_nothing(a, n):
    return a[:n]

def benchmark(func, size=1000000, ntimes=100, topn=50):
    t1 = time.time()
    for n in range(ntimes):
        a = np.random.rand(size)
        func(a, topn)
    t2 = time.time()
    ms_per_loop = 1000000 * (t2 - t1) / size
    return ms_per_loop

t1 = benchmark(bottleneck_1)
t2 = benchmark(bottleneck_2)
t3 = benchmark(pandas)
t4 = benchmark(hpq)
t5 = benchmark(numpy)
t6 = benchmark(numpy_2)
t0 = benchmark(do_nothing)

print("bottleneck 1: :05.2f ms per loop".format(t1 - t0))
print("bottleneck 2: :05.2f ms per loop".format(t2 - t0))
print("pandas      : :05.2f ms per loop".format(t3 - t0))
print("heapq       : :05.2f ms per loop".format(t4 - t0))
print("numpy       : :05.2f ms per loop".format(t5 - t0))
print("numpy 2     : :05.2f ms per loop".format(t6 - t0))

【讨论】:

感谢代码!我还测试了 np.argpartition,发现当 argpartition 设置为查找前 1 个元素时,它比 np.argmax 慢 10 倍。【参考方案4】:

提出的瓶颈解决方案中的每个负号

-bottleneck.partsort(-a, 10)[:10]

制作数据的副本。我们可以通过这样做来删除副本

bottleneck.partsort(a, a.size-10)[-10:]

还有建议的 numpy 解决方案

a.argsort()[-10:]

返回索引而不是值。解决方法是使用索引来查找值:

a[a.argsort()[-10:]]

两种瓶颈解决方案的相对速度取决于初始数组中元素的顺序,因为这两种方法在不同点对数据进行分区。

换句话说,使用任何一个特定的随机数组进行计时可以使任一方法看起来更快。

平均 100 个随机数组的时间,每个数组有 1,000,000 个元素,得到

-bn.partsort(-a, 10)[:10]: 1.76 ms per loop
bn.partsort(a, a.size-10)[-10:]: 0.92 ms per loop
a[a.argsort()[-10:]]: 15.34 ms per loop

其中计时码如下:

import time
import numpy as np
import bottleneck as bn

def bottleneck_1(a):
    return -bn.partsort(-a, 10)[:10]

def bottleneck_2(a):
    return bn.partsort(a, a.size-10)[-10:]

def numpy(a):
    return a[a.argsort()[-10:]]

def do_nothing(a):
    return a

def benchmark(func, size=1000000, ntimes=100):
    t1 = time.time()
    for n in range(ntimes):
        a = np.random.rand(size)
        func(a)
    t2 = time.time()
    ms_per_loop = 1000000 * (t2 - t1) / size
    return ms_per_loop

t1 = benchmark(bottleneck_1)
t2 = benchmark(bottleneck_2)
t3 = benchmark(numpy)
t4 = benchmark(do_nothing)

print "-bn.partsort(-a, 10)[:10]: %0.2f ms per loop" % (t1 - t4)
print "bn.partsort(a, a.size-10)[-10:]: %0.2f ms per loop" % (t2 - t4)
print "a[a.argsort()[-10:]]: %0.2f ms per loop" % (t3 - t4)

【讨论】:

【参考方案5】:

也许heapq.nlargest

import numpy as np
import heapq

x = np.array([1,-5,4,6,-3,3])

z = heapq.nlargest(3,x)

结果:

>>> z
[6, 4, 3]

如果您想使用 bottleneck 查找 n 最大元素的索引,您可以使用 bottleneck.argpartsort

>>> x = np.array([1,-5,4,6,-3,3])
>>> z = bottleneck.argpartsort(-x, 3)[:3]
>>> z
array([3, 2, 5]

【讨论】:

但是heap q其实比较慢(下个回复也提到了)。【参考方案6】:

你也可以使用 numpy 的百分位函数。就我而言,它比bottleneck.partsort() 稍快:

import timeit
import bottleneck as bn

N,M,K = 10,1000000,100

start = timeit.default_timer()
for k in range(K):
    a=np.random.uniform(size=M)
    tmp=-bn.partsort(-a, N)[:N]
stop = timeit.default_timer()
print (stop - start)/K

start = timeit.default_timer()
perc = (np.arange(M-N,M)+1.0)/M*100
for k in range(K):
    a=np.random.uniform(size=M)
    tmp=np.percentile(a,perc)
stop = timeit.default_timer()
print (stop - start)/K

每个循环的平均时间:

bottleneck.partsort():59 毫秒 np.percentile(): 54 毫秒

【讨论】:

请注意,默认情况下,百分位数可能会插入一些值。如果您希望完全与输入数组中的值相同,您可以将参数interpolation='nearest' 添加到对np.percentile 的调用中。有关详细信息,请参阅documentation。【参考方案7】:

如果将数组存储为数字列表没有问题,您可以使用

import heapq
heapq.nlargest(N, a)

获得N 最大的成员。

【讨论】:

以上是关于在 numpy 数组中查找最大 N 个元素的快速方法的主要内容,如果未能解决你的问题,请参考以下文章

如何找到 Numpy 数组的 M 个元素的 N 个最大乘积子数组?

替换所有小于每行中“n”个最大项目的numpy数组值

Python | 快速获取某一列数组中前 N 个最大值/最小值的索引 | 三种方法总结

Python | 快速获取某一列数组中前 N 个最大值/最小值的索引 | 三种方法总结

如何查找无序数组中的Top n

numpy的通用函数:快速的元素级数组函数