如何从每个线程的数组中读取唯一元素?
Posted
技术标签:
【中文标题】如何从每个线程的数组中读取唯一元素?【英文标题】:How to read unique elements from array per thread? 【发布时间】:2013-10-23 07:46:07 【问题描述】:我有一个基于数组的对象,它实现了如下接口:
public interface PairSupplier<Q, E>
public int size();
public Pair<Q, E> get(int index);
我想在它上面创建一个特定的迭代器:
public boolean hasNext()
return true;
public Pair<Q, E> next()
//some magic
在 next 方法中,我想从 PairSupplier 返回一些元素。
这个元素对于线程来说应该是唯一的,其他线程不应该有这个元素。
由于 PairSupplier 有最终尺寸,这种情况并不总是可能的,但我想接近它。
元素的顺序无关紧要,线程可以在不同时间获取相同的元素。
示例:2 Threads
、5 elements
- 1,2,3,4,5
Thread 1 | Thread 2
1 2
3 4
5 1
3 2
4 5
我的解决方案:
我创建了 AtomicInteger 索引,每次下一个调用都会增加它。
PairSupplier pairs;
AtomicInteger index;
public boolean hasNext()
return true;
public Pair<Q, E> next()
int position = index.incrementAndGet() % pairs.size;
if (position < 0)
position *= -1;
position = pairs.size - position;
return pairs.get(position);
pairs 和 index 在所有线程之间共享。
我发现这个解决方案不可扩展(因为所有线程都在递增),也许有人有更好的想法?
此迭代器将由 50-1000 个线程使用。
【问题讨论】:
那是完整的代码吗? 这个example(s) 可能会有所帮助。 @SachinThapa 不,我只想展示问题和可能解决方案的一部分 如何将 JMS 用于非持久消息? 你说:这个元素对于线程来说应该是唯一的,其他线程不应该有这个元素。但是在你的例子中,线程2访问1
已经被线程 1?还是我错过了什么?啊,我的错,只是阅读不同时间的东西。那么怎么知道线程已经完成了对元素的处理呢?
【参考方案1】:
您的问题细节不明确 - 您的示例表明两个线程可以处理相同的Pair
,但您在说明中另有说明。
由于更难实现,我将提供一个Iterable<Pair<Q,E>>
,它将为每个线程提供一个Pair
s,直到供应商周期 - 然后它会重复。
public interface Supplier<T>
public int size();
public T get(int index);
public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>>
public class IterableSupplier<T> implements Iterable<T>
// The common supplier to use across all threads.
final Supplier<T> supplier;
// The atomic counter.
final AtomicInteger i = new AtomicInteger();
public IterableSupplier(Supplier<T> supplier)
this.supplier = supplier;
@Override
public Iterator<T> iterator()
/**
* You may create a NEW iterator for each thread while they all share supplier
* and Will therefore distribute each Pair between different threads.
*
* You may also share the same iterator across multiple threads.
*
* No two threads will get the same pair twice unless the sequence cycles.
*/
return new ThreadSafeIterator();
private class ThreadSafeIterator implements Iterator<T>
@Override
public boolean hasNext()
/**
* Always true.
*/
return true;
private int pickNext()
// Just grab one atomically.
int pick = i.incrementAndGet();
// Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
int actual = pick % supplier.size();
if (pick != actual)
// So long as someone has a success before we overflow int we're good.
i.compareAndSet(pick, actual);
return actual;
@Override
public T next()
return supplier.get(pickNext());
@Override
public void remove()
throw new UnsupportedOperationException("Remove not supported.");
注意:我已经稍微调整了代码以适应这两种情况。您可以为每个线程获取一个Iterator
,也可以跨线程共享一个Iterator
。
【讨论】:
你的解决方案看起来不错,我也更喜欢原子索引的想法,但我试图找到没有同步的东西。另一位用户建议使用存储桶,但这不是我的情况(阅读以前的答案)。可能是随机函数可以解决我的问题吗?但我不知道如何在多线程环境中使用随机。 @MaryRyllo - 这里没有同步 - 你不太可能找到更好的东西,但一定要研究其他选项。请注意,我已经调整了代码以适应任何一种情况。【参考方案2】:您有一条必须在所有线程之间共享的信息(“有人已经使用了这个Pair
吗?”)。所以对于一般情况,你被卡住了。但是,如果您知道数组的大小和线程数,您可以使用存储桶来减少痛苦。
假设我们知道将有 1,000,000 个数组元素和 1,000 个线程。为每个线程分配一个范围(线程#1 获取元素 0-999 等)。现在不再是 1,000 个线程争用一个 AtomicInteger,您可以完全没有争用!
如果您可以确定所有线程将以大致相同的速度运行,则此方法有效。如果您需要处理有时线程 #1 忙于做其他事情而线程 #2 空闲的情况,您可以稍微修改您的存储桶模式:每个存储桶都有一个 AtomicInteger。现在线程一般只会与自己竞争,但如果他们的桶是空的,他们可以移动到下一个桶。
【讨论】:
我想到了桶,这是个好主意(+1 到 exp),但我不知道线程数。它可以动态增长。而且我想尽可能多地使用对,并尽量避免使用相同的对。 有些桶比一个更好(你现在拥有的)。如果线程数量超出您的猜测,您可以锁定整个 PairSupplier 并重新存储桶(但这几乎肯定会更昂贵)。您可以让每个next()
根据线程数返回多个元素(这有点类似于动态存储桶)。
我喜欢它 - 一个可调整大小的存储桶列表 :)
@NathanielWaisbrot 我的应用程序模拟 TPS 负载。为了保持准确的 tps 计数,我添加或删除了一些线程。所以线程数经常改变=(【参考方案3】:
我无法理解您要解决的问题是什么?
每个线程是否处理整个集合?
是否担心没有两个线程可以同时在同一个 Pair 上工作?但是每个线程都需要处理集合中的每一个Pair?
或者您是否希望使用所有线程处理一次集合?
【讨论】:
【参考方案4】:在您的示例中,有一个关键的事情是模糊的 - 这到底是什么意思?
元素的顺序无关紧要,线程可以在不同的时间获取相同的元素。
“不同的时间”是什么意思?在 N 毫秒之内?这是否意味着绝对两个线程永远不会同时接触同一对?我会假设的。
如果您想降低线程相互阻塞以争夺同一个 Pair 的概率,并且有一个 Pair 的后备数组,请尝试以下操作:
将您的数组划分为numPairs / threadCount
子数组(您不必实际创建子数组,只需从不同的偏移量开始 - 但将其视为子数组更容易)
将每个线程分配给不同的子数组;当线程耗尽其子数组时,增加其子数组的索引
假设我们有 6 个对和 2 个线程 - 您的分配看起来像 Thread-1:[0,1,2] Thread-2:[3,4,5]。当线程 1 启动时,它会查看与线程 2 不同的 Pairs 集合,因此它们不太可能会争夺同一 Pairs
如果两个线程真的不能同时接触 Pair 很重要,则将接触 Pair 对象的所有代码包装在 synchronized(pair)
中(在 实例上同步,不是类型!) - 可能偶尔会阻塞,但你永远不会阻塞单个事物上的所有线程,就像 AtomicInteger
一样 - 线程只能相互阻塞,因为它们真的试图触摸同一个对象
请注意,这不保证永远不会阻塞 - 为此,所有线程必须以完全相同的速度运行,并且处理每个 Pair 对象必须花费完全相同的时间,并且操作系统的线程调度程序必须永远不会从一个线程而不是另一个线程中窃取时间。你不能假设任何这些事情。这为您提供了更高的概率,通过划分工作区域并使共享的最小状态单元成为锁,您将获得更好的并发性。
但这是在数据结构上获得更多并发性的常用模式 - 在线程之间划分数据,以便它们很少同时接触同一个锁。
【讨论】:
“这是否意味着绝对两个线程永远不会同时接触同一个 Pair” - 是的,我希望尽可能地做到这一点 =)【参考方案5】:最容易看到的是创建哈希集或映射,并为每个线程提供唯一的哈希。之后,只需通过此哈希码进行简单获取。
【讨论】:
【参考方案6】:这是标准的 java 信号量使用问题。以下 javadoc 给出了与您的问题几乎相似的示例。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html
如果您需要更多帮助,请告诉我?
【讨论】:
【参考方案7】:我更喜欢锁定和释放过程。
如果一个线程正在请求一个对对象,则该对对象将从供应商处移除。在线程请求新对之前,“旧”对再次添加到供应商。
你可以从前面推到最后。
【讨论】:
以上是关于如何从每个线程的数组中读取唯一元素?的主要内容,如果未能解决你的问题,请参考以下文章
如何从数组中读取特定元素,该元素是来自 mongodb 的车把文件中对象的一部分?