如何实现 PriorityBlockingQueue 的循环排序?

Posted

技术标签:

【中文标题】如何实现 PriorityBlockingQueue 的循环排序?【英文标题】:How to implement round-robin order for PriorityBlockingQueue? 【发布时间】:2015-01-02 05:33:13 【问题描述】:

我有一个PriorityBlockingQueue。单个线程一次使用此队列中的一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态AtomicLong 用于为每条消息分配一个唯一的、单调递增的ID。队列的Comparator先按此优先级对消息进行排序,然后同等优先级的消息按ID排序(ID最低的在前)。

问题:有时一个生产者会提交大量消息。然后,这会使其他生产者无法处理他们的消息。我想做的是在生产者之间进行消费者循环以获取相同优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但我不知道如何编写 Comparator 来做到这一点。

我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。

【问题讨论】:

【参考方案1】:

我觉得为每个生产者使用一个Queue 来实现这一点更简单。一个线程不能等待多个Queues,但您可以将所有Queues 合并到一个辅助类中,这样就不需要了。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.concurrent.GuardedBy;

public class RoundRobin<P, E> 
    private final Lock lock = new ReentrantLock();
    private final Condition added = lock.newCondition();

    @GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();

    public boolean add(P producer, E item) 
        lock.lock();
        try 
            if (!queues.containsKey(producer)) 
                queues.put(producer, new PriorityBlockingQueue<>());
            

            added.signalAll();
            return queues.get(producer).add(item);
         finally 
            lock.unlock();
        
    

    public Iterator<E> roundRobinIterator() 
        return new Iterator<E>() 
            private Iterator<? extends Queue<E>> i = null;
            private boolean singlePass = true;

            @Override
            public boolean hasNext() 
                return true;
            

            @Override
            public E next() 
                lock.lock();
                try 
                    while (true) 
                        if (i == null || !i.hasNext()) 
                            i = queues.values().iterator();
                            singlePass = true;
                        

                        while (i.hasNext()) 
                            Queue<E> q = i.next();
                            if (!q.isEmpty()) 
                                if (singlePass) 
                                    // copy the iterator to prevent
                                    // ConcurrentModificationExceptions
                                    singlePass = false;
                                    i = copy(i);
                                
                                return q.poll();
                            
                        

                        if (singlePass) 
                            // If singlePass is true then we just checked every
                            // queue and they were all empty.
                            // Wait for another element to be added.
                            added.await();
                        
                    
                 catch (InterruptedException e) 
                    throw new NoSuchElementException(e.getMessage());
                 finally 
                    lock.unlock();
                
            

            private <T> Iterator<? extends T> copy(Iterator<? extends T> i) 
                List<T> copy = new ArrayList<>();
                while (i.hasNext()) 
                    copy.add(i.next());
                
                return copy.iterator();
            
        ;
    

【讨论】:

我喜欢这个想法(特别是使用 ReenterantLock 和 Condition)。我会尝试类似的东西。 由于外部有 ReentrantLock,因此内部不需要 PriorityBlockingQueue,PriorityQueue 就可以了——双重锁定会带来更差的性能和更大的死锁风险(不是您当前的代码,但将来可能会更改)。 另外,当我需要更高的优先级来应用不同生产者的消息时,您的解决方案为每个生产者都有一个单独的优先级队列。你有 LinkedHashMap;我最终基本上做了 Map>。我接受你的回答,因为它让我在正确的轨道上思考,并且最接近我最终实际做的事情。【参考方案2】:

我想我会这样做:

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class RRQueue<M> 
    private final ThreadLocal<Queue<M>> threadQueue = new ThreadLocal<>();
    private final List<Queue<M>> queues;
    private int current = 0;

    public RRQueue() 
        this.queues = new ArrayList<>();
    

    public synchronized void add(M msg) 
        Queue<M> queue = threadQueue.get();
        if (queue == null) 
            queue = new LinkedList<>(); // or whatever
            queues.add(queue);
            threadQueue.set(queue);
        
        queue.add(msg);
        notify();
    

    public synchronized M get() throws InterruptedException 
        while (true) 
            for (int i = 0; i < queues.size(); ++i) 
                Queue<M> queue = queues.get(current);
                current = (current+1)%queues.size();
                if (!queue.isEmpty()) 
                    return queue.remove();
                
            
            wait();
        
    

【讨论】:

我认为这可行。但是,它没有解决优先级问题。【参考方案3】:

这完全取决于您如何分配 ID。将它们分开分配 N,其中 N 是生产者的数量,并将每个生产者的索引添加到其中。然后按顺序读取它们将产生循环顺序。您需要做一点记账才能知道何时增加底层 ID,这将在您达到任何 xNx-1 时发生。

【讨论】:

有两个生产者,你是说给第一个生产者分配偶数ID,给第二个生产者分配奇数ID?假设生产者 #1 提交了 10 条消息,这些消息已完全处理 - 它使用前十个奇数 ID。一段时间后,生产者 #1 提交了 10 条消息,这些消息获得了接下来的 10 个奇数 ID。同时,生产者#2 提交了 10 条消息,这些消息消耗了前 10 个偶数 ID。这不会导致生产者 #2 的第一批在生产者 #1 的第二批之前处理吗?这不是未完成消息的循环。 没有。根据我的最后一句话,您将增加基础 ID,因此在您的示例中,ID 将运行 0,2,4,6,8,10,12,14,16,18,19, ... 所以你会有一个计数器,它以 N 递增(其中 N 是当前生产者计数),以及一个大小为 N 的布尔标志数组,最初都是假的。当生产者 x 请求一个 ID 时,如果 flags[x] 为 false,则设置 flags[x]=true;如果 flags[x] 为真,则将 counter 增加 N,将所有标志设置为 false,设置 flags[x]=true;那么,无论哪种情况,都返回 counter+x?我还假设一个同步块来保护计数器和数组。此外,要动态更改生产者计数,需要调整数组大小,将其所有值设置为 false,递增计数器。 记账的细节我还没想好,也不一定只有这样,但我给的顺序就是你想要的,是吗? 假设生产者 #1 提交了 100 条消息。到目前为止,已经处理了其中的 50 个。现在生产者#2 提交了 100 条消息。由于增加了底层 ID,生产者 #2 的所有消息的 ID 号是否都高于生产者 #1 的 ID 号,所以在生产者 #1 完成之前不会处理它们吗?这不是循环。【参考方案4】:

可以使用 PriorityBlockingQueue 实现 N 个生产者之间的循环,如下所示。

为每个生产者维护 N 个 AtomicInteger 计数器和一个全局计数器,以防生产者计数器相等。

当添加到生产者的 Q incement 计数器以及全局计数器并存储到 Q 对象中时。 Q 的比较器将根据生产者计数器 1st 排序,然后是存储在 Q 对象中的全局计数器值。

但是,当一个生产者的对象在 Q 中的某个时间为空时,相应的计数器就会落后,并且当对象开始进入时它将开始占用 Q。

为避免这种情况,请维护一个 volatile 变量,该变量在出队时使用对象的生产者计数器进行更新。如果值小于 volatile 变量中最后一个出队的计数器,则在入队期间增加生产者计数器后,将计数器重置为该值 + 1。

【讨论】:

以上是关于如何实现 PriorityBlockingQueue 的循环排序?的主要内容,如果未能解决你的问题,请参考以下文章

如何快速自己实现Map

API 面试四连杀:接口如何设计?安全如何保证?签名如何实现?防重如何实现?

delphi 中如何实现对象之间的数据共享

C#如何实现进程单例运行

Android如何实现弹幕效果

如何实现两个div之间的连线,我现在有这个需求,请问如何实现