如何实现 PriorityBlockingQueue 的循环排序?
Posted
技术标签:
【中文标题】如何实现 PriorityBlockingQueue 的循环排序?【英文标题】:How to implement round-robin order for PriorityBlockingQueue? 【发布时间】:2015-01-02 05:33:13 【问题描述】:我有一个PriorityBlockingQueue
。单个线程一次使用此队列中的一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态AtomicLong
用于为每条消息分配一个唯一的、单调递增的ID。队列的Comparator
先按此优先级对消息进行排序,然后同等优先级的消息按ID排序(ID最低的在前)。
问题:有时一个生产者会提交大量消息。然后,这会使其他生产者无法处理他们的消息。我想做的是在生产者之间进行消费者循环以获取相同优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但我不知道如何编写 Comparator
来做到这一点。
我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。
【问题讨论】:
【参考方案1】:我觉得为每个生产者使用一个Queue
来实现这一点更简单。一个线程不能等待多个Queue
s,但您可以将所有Queue
s 合并到一个辅助类中,这样就不需要了。
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
public class RoundRobin<P, E>
private final Lock lock = new ReentrantLock();
private final Condition added = lock.newCondition();
@GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();
public boolean add(P producer, E item)
lock.lock();
try
if (!queues.containsKey(producer))
queues.put(producer, new PriorityBlockingQueue<>());
added.signalAll();
return queues.get(producer).add(item);
finally
lock.unlock();
public Iterator<E> roundRobinIterator()
return new Iterator<E>()
private Iterator<? extends Queue<E>> i = null;
private boolean singlePass = true;
@Override
public boolean hasNext()
return true;
@Override
public E next()
lock.lock();
try
while (true)
if (i == null || !i.hasNext())
i = queues.values().iterator();
singlePass = true;
while (i.hasNext())
Queue<E> q = i.next();
if (!q.isEmpty())
if (singlePass)
// copy the iterator to prevent
// ConcurrentModificationExceptions
singlePass = false;
i = copy(i);
return q.poll();
if (singlePass)
// If singlePass is true then we just checked every
// queue and they were all empty.
// Wait for another element to be added.
added.await();
catch (InterruptedException e)
throw new NoSuchElementException(e.getMessage());
finally
lock.unlock();
private <T> Iterator<? extends T> copy(Iterator<? extends T> i)
List<T> copy = new ArrayList<>();
while (i.hasNext())
copy.add(i.next());
return copy.iterator();
;
【讨论】:
我喜欢这个想法(特别是使用 ReenterantLock 和 Condition)。我会尝试类似的东西。 由于外部有 ReentrantLock,因此内部不需要 PriorityBlockingQueue,PriorityQueue 就可以了——双重锁定会带来更差的性能和更大的死锁风险(不是您当前的代码,但将来可能会更改)。 另外,当我需要更高的优先级来应用不同生产者的消息时,您的解决方案为每个生产者都有一个单独的优先级队列。你有 LinkedHashMap我想我会这样做:
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class RRQueue<M>
private final ThreadLocal<Queue<M>> threadQueue = new ThreadLocal<>();
private final List<Queue<M>> queues;
private int current = 0;
public RRQueue()
this.queues = new ArrayList<>();
public synchronized void add(M msg)
Queue<M> queue = threadQueue.get();
if (queue == null)
queue = new LinkedList<>(); // or whatever
queues.add(queue);
threadQueue.set(queue);
queue.add(msg);
notify();
public synchronized M get() throws InterruptedException
while (true)
for (int i = 0; i < queues.size(); ++i)
Queue<M> queue = queues.get(current);
current = (current+1)%queues.size();
if (!queue.isEmpty())
return queue.remove();
wait();
【讨论】:
我认为这可行。但是,它没有解决优先级问题。【参考方案3】:这完全取决于您如何分配 ID。将它们分开分配 N,其中 N 是生产者的数量,并将每个生产者的索引添加到其中。然后按顺序读取它们将产生循环顺序。您需要做一点记账才能知道何时增加底层 ID,这将在您达到任何 x 的 Nx-1 时发生。
【讨论】:
有两个生产者,你是说给第一个生产者分配偶数ID,给第二个生产者分配奇数ID?假设生产者 #1 提交了 10 条消息,这些消息已完全处理 - 它使用前十个奇数 ID。一段时间后,生产者 #1 提交了 10 条消息,这些消息获得了接下来的 10 个奇数 ID。同时,生产者#2 提交了 10 条消息,这些消息消耗了前 10 个偶数 ID。这不会导致生产者 #2 的第一批在生产者 #1 的第二批之前处理吗?这不是未完成消息的循环。 没有。根据我的最后一句话,您将增加基础 ID,因此在您的示例中,ID 将运行 0,2,4,6,8,10,12,14,16,18,19, ... 所以你会有一个计数器,它以 N 递增(其中 N 是当前生产者计数),以及一个大小为 N 的布尔标志数组,最初都是假的。当生产者 x 请求一个 ID 时,如果 flags[x] 为 false,则设置 flags[x]=true;如果 flags[x] 为真,则将 counter 增加 N,将所有标志设置为 false,设置 flags[x]=true;那么,无论哪种情况,都返回 counter+x?我还假设一个同步块来保护计数器和数组。此外,要动态更改生产者计数,需要调整数组大小,将其所有值设置为 false,递增计数器。 记账的细节我还没想好,也不一定只有这样,但我给的顺序就是你想要的,是吗? 假设生产者 #1 提交了 100 条消息。到目前为止,已经处理了其中的 50 个。现在生产者#2 提交了 100 条消息。由于增加了底层 ID,生产者 #2 的所有消息的 ID 号是否都高于生产者 #1 的 ID 号,所以在生产者 #1 完成之前不会处理它们吗?这不是循环。【参考方案4】:可以使用 PriorityBlockingQueue 实现 N 个生产者之间的循环,如下所示。
为每个生产者维护 N 个 AtomicInteger 计数器和一个全局计数器,以防生产者计数器相等。
当添加到生产者的 Q incement 计数器以及全局计数器并存储到 Q 对象中时。 Q 的比较器将根据生产者计数器 1st 排序,然后是存储在 Q 对象中的全局计数器值。
但是,当一个生产者的对象在 Q 中的某个时间为空时,相应的计数器就会落后,并且当对象开始进入时它将开始占用 Q。
为避免这种情况,请维护一个 volatile 变量,该变量在出队时使用对象的生产者计数器进行更新。如果值小于 volatile 变量中最后一个出队的计数器,则在入队期间增加生产者计数器后,将计数器重置为该值 + 1。
【讨论】:
以上是关于如何实现 PriorityBlockingQueue 的循环排序?的主要内容,如果未能解决你的问题,请参考以下文章