交错两个 numpy 索引数组,每个数组中的一项
Posted
技术标签:
【中文标题】交错两个 numpy 索引数组,每个数组中的一项【英文标题】:Interleaving two numpy index arrays, one item from each array 【发布时间】:2012-08-03 08:30:18 【问题描述】:我有两个有序的 numpy 数组,我想将它们交错,以便我从第一个数组中获取一个项目,然后从第二个数组中获取另一个,然后返回到第一个 - 获取下一个大于我刚才的那个的项目从第二个等等开始。 这些实际上是其他数组的索引数组,只要操作是矢量化的,我就可以对原始数组进行操作(当然,将索引数组作为向量操作处理会很棒)。
示例(可以假设数组的交集为空)
a = array([1,2,3,4,7,8,9,10,17])
b = array([5,6,13,14,15,19,21,23])
我想得到 [1,5,7,13,17,19]
【问题讨论】:
仅供参考,我做了一些测试,这个pure python solution 实际上比numpy-based solution 快,至少根据我的测试。 【参考方案1】:我认为您将很难将numpy
矢量化应用于此问题并保持线性性能。这需要存储相当多的状态:a
中的当前索引、b
中的当前索引和threshold
中的当前索引。 numpy
没有提供很多有状态的向量化函数。事实上,我脑海中唯一能想到的就是ufunc.reduce
,它不太适合这个问题——尽管它当然可以被硬塞到工作中。
事实上,就在我发布此消息后,我的浏览器更新为excellent vectorized solution。但该解决方案需要排序,即 O(n log n);事实上,经过一些测试,我发现下面的纯 python 解决方案对于所有输入都更快!
def interleave_monotonic(a, b):
try:
a = iter(a)
threshold = next(a)
yield threshold
for current in b:
if current > threshold:
threshold = current
yield threshold
while current <= threshold:
current = next(a)
threshold = current
yield threshold
except StopIteration:
return
请注意,如果a
为空,则返回一个空的迭代器,但如果b
为空,则返回一个包含a
的第一个值的单项迭代器。我认为这与您的示例一致,但这是一个我不确定的极端情况。此外,基于numpy
的解决方案表现出略微不同的行为,始终以a[0]
和b[0]
中的较小者开头,而以上始终以a
的第一个值开头,无论如何。我修改了上面的内容以检查以下测试,这些测试清楚地表明纯 python 解决方案是最快的。
定义:
def interleave_monotonic_np(a, b):
ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
return ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
def interleave_monotonic(a, b):
a, b = (a, b) if a[0] <= b[0] else (b, a)
try:
a = iter(a)
threshold = next(a)
yield threshold
for current in b:
if current > threshold:
threshold = current
yield threshold
while current <= threshold:
current = next(a)
threshold = current
yield threshold
except StopIteration:
return
def interleave_monotonic_py(a, b):
return numpy.fromiter(interleave_monotonic(a, b), dtype='int64')
def get_a_b(n):
rnd = random.sample(xrange(10 ** 10), n * 2)
return sorted(rnd[0::2]), sorted(rnd[1::2])
测试:
>>> for e in range(7):
... a, b = get_a_b(10 ** e)
... print (interleave_monotonic_np(a, b) ==
... interleave_monotonic_py(a, b)).all()
... %timeit interleave_monotonic_np(a, b)
... %timeit interleave_monotonic_py(a, b)
...
True
10000 loops, best of 3: 85.6 us per loop
100000 loops, best of 3: 5.53 us per loop
True
10000 loops, best of 3: 91.7 us per loop
100000 loops, best of 3: 9.19 us per loop
True
10000 loops, best of 3: 144 us per loop
10000 loops, best of 3: 46.5 us per loop
True
1000 loops, best of 3: 711 us per loop
1000 loops, best of 3: 445 us per loop
True
100 loops, best of 3: 6.67 ms per loop
100 loops, best of 3: 4.42 ms per loop
True
10 loops, best of 3: 135 ms per loop
10 loops, best of 3: 55.7 ms per loop
True
1 loops, best of 3: 1.58 s per loop
1 loops, best of 3: 654 ms per loop
【讨论】:
+1 然后简单地使用np.fromiter(interleave_monotonic(a,b),np.int)
【参考方案2】:
你可以把问题分成两部分:
-
使
a
和b
迭代器仅在它们满足时才产生值
比一些threshold
大。
使用 itertools
(本质上是 George Sakkis 的循环方法)在两个迭代器之间交替。
import itertools
import numpy as np
a = np.array([1,2,3,4,7,8,9,10,17])
b = np.array([5,6,13,14,15,19,21,23])
def takelarger(iterable):
for x in iterable:
if x > takelarger.threshold:
takelarger.threshold = x
yield x
takelarger.threshold = 0
def alternate(*iterables):
# Modified version of George Sakkis' roundrobin recipe
# http://docs.python.org/library/itertools.html#recipes
pending = len(iterables)
nexts = itertools.cycle(iter(it).next for it in iterables)
while pending:
try:
for n in nexts:
yield n()
except StopIteration:
# Unlike roundrobin, stop when any iterable has been consumed
break
def interleave(a, b):
takelarger.threshold = 0
return list(alternate(takelarger(a),takelarger(b)))
print(interleave(a,b))
产量
[1, 5, 7, 13, 17, 19]
【讨论】:
谢谢。非常优雅的解决方案。我希望找到一个具有数组操作的解决方案,该解决方案将以“矢量化”方式工作,并且希望更快。【参考方案3】:受其他帖子的启发,我对纯 python 代码有另一个想法,它需要事先排序,但与 a 和 b 对称
def myIterator4(a, b):
curr = iter(a)
next = iter(b)
try:
max = curr.next()
tmp = next.next() # Empty iterator if b is empty
while True:
yield max
while tmp<=max:
tmp = next.next()
max = tmp
curr, next = next, curr
except StopIteration:
return
如果 a 或 b 为空,则返回空迭代器。
【讨论】:
【参考方案4】:矢量化解决方案(教学风格,易于理解)
我们可以通过使用判别器索引扩充数组来将其向量化,例如 a
被标记为 0
并且 b
被标记为 1
:
a_t = np.vstack((a, np.zeros_like(a)))
b_t = np.vstack((b, np.ones_like(b)))
现在,让我们组合和排序:
c = np.hstack((a_t, b_t))[:, np.argsort(np.hstack((a, b)))]
array([[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14, 15, 17, 19, 21, 23],
[ 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1]])
你可以看到现在元素是有序的但保留了它们的标签,所以我们可以看到哪些元素来自a
和b
。
所以,让我们选择第一个元素以及标签从0
(对于a
)变为1
(对于b
)并再次返回的每个元素:
c[:, np.concatenate(([True], c[1, 1:] != c[1, :-1]))][0]
array([ 1, 5, 7, 13, 17, 19])
高效的矢量化解决方案
您可以通过将项目及其标签保存在单独(但并行)的数组中来稍微更有效地做到这一点:
ab = np.hstack((a, b))
s = np.argsort(ab)
t = np.hstack((np.zeros_like(a), np.ones_like(b)))[s]
ab[s][np.concatenate(([True], t[1:] != t[:-1]))]
array([ 1, 5, 7, 13, 17, 19])
这比上面的方案效率略高;我得到的平均时间是 45 微秒,而不是 90 微秒,尽管您的条件可能会有所不同。
【讨论】:
【参考方案5】:a = [1,2,3,4,7,8,9,10,17]
b = [5,6,13,14,15,19,21,23]
c=[]
c.append(a[0]) #add the first element to c
i=True #breaking condition
loop=2 #initially we to choose b
while i:
if loop==2:
val=c[-1]
temp_lis=d([x-val for x in b]) #find the difference between every
# element and the last element of c and
# pick the smallest positive value.
for k,y in enumerate(temp_lis):
if y>0:
c.append(b[k])
break
else:
i=False #use for-else loop for determining the breaking condition
loop=1 #change the loop to 1
elif loop==1:
val=c[-1]
temp_lis=[x-val for x in a]
for k,y in enumerate(temp_lis):
if y>0:
c.append(a[k])
break
else:
i=False
loop=2
print c
输出:
[1, 5, 7, 13, 17, 19]
【讨论】:
以上是关于交错两个 numpy 索引数组,每个数组中的一项的主要内容,如果未能解决你的问题,请参考以下文章
两个 1D numpy / torch 数组的特殊索引以生成另一个数组