Java - 多队列生产者消费者
Posted
技术标签:
【中文标题】Java - 多队列生产者消费者【英文标题】:Java - Multiple queue producer consumer 【发布时间】:2014-11-14 18:09:55 【问题描述】:我有以下代码:
while(!currentBoard.boardIsValid())
for (QueueLocation location : QueueLocation.values())
while(!inbox.isEmpty(location))
Cell c = inbox.dequeue(location);
notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
我有一个有几个队列的消费者(他们的所有方法都是同步的)。每个生产者一个队列。消费者循环遍历所有队列并检查他们是否有任务供他消费。 如果他正在检查的队列中有一个任务,他就会使用它。否则,他会去检查下一个队列,直到他完成对所有队列的迭代。
到目前为止,如果他遍历所有队列并且它们都是空的,他会继续循环而不是等待其中一个包含某些内容(如外部 while
所见)。
如何让消费者等到其中一个队列中有东西?
我遇到以下情况的问题:假设只有 2 个队列。消费者检查了第一个,它是空的。就在他检查第二个(也是空的)时,生产者在第一个队列中放了一些东西。就消费者而言,队列都是空的,所以他应该等待(即使其中一个不再是空的,他应该继续循环)。
编辑: 最后一件事。这对我来说是一个练习。我正在尝试自己实现同步。因此,如果任何 java 库都有实现此功能的解决方案,我对此不感兴趣。我试图了解如何实现这一点。
【问题讨论】:
除非您可以修改生产者以执行额外的信号/通知,否则有两种解决方案,1) 使用超时并在 CPU 消耗和响应性之间进行权衡,或者 2) 使用额外的线程在队列中等待并通知消费者。第二种方法的额外线程会消耗更多内存,但不会占用大量 CPU 时间,因为它们大部分时间都在等待。 【参考方案1】:@Abe 很接近。我会使用信号并等待 - 使用内置的 Object 类,因为它们是最轻的。
Object sync = new Object(); // Can use an existing object if there's an appropriate one
// On submit to queue
synchronized ( sync )
queue.add(...); // Must be inside to avoid a race condition
sync.notifyAll();
// On check for work in queue
synchronized ( sync )
item = null;
while ( item == null )
// Need to check all of the queues - if there will be a large number, this will be slow,
// and slow critical sections (synchronized blocks) are very bad for performance
item = getNextQueueItem();
if ( item == null )
sync.wait();
注意sync.wait
释放同步锁直到通知 - 同步锁是成功调用等待方法所必需的(这是对程序员的提醒,它确实需要某种类型的临界区才能工作可靠)。
顺便说一句,如果可行的话,我会推荐一个专用于消费者(或一组消费者)的队列,而不是一个专用于生产者的队列。它将简化解决方案。
【讨论】:
【参考方案2】:如果您想跨多个队列进行阻塞,那么一种选择是使用 java 的Lock
and Condition
objects and then use the signal
method。
所以每当生产者有数据时,它应该调用signallAll
。
Lock fileLock = new ReentrantLock();
Condition condition = fileLock.newCondition();
...
// producer has to signal
condition.signalAll();
...
// consumer has to await.
condition.await();
只有在提供信号时,消费者才会去检查队列。
【讨论】:
【参考方案3】:我按照@Abe 的建议解决了类似的情况,但最终决定将Semaphore
与AtomicBoolean
结合使用,并将其称为BinarySemaphore。它确实需要对生产者进行修改,以便他们在有事情要做时发出信号。
下面是 BinarySemaphore 的代码和消费者工作循环应该是什么样子的一般概念:
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class MultipleProdOneConsumer
BinarySemaphore workAvailable = new BinarySemaphore();
class Consumer
volatile boolean stop;
void loop()
while (!stop)
doWork();
if (!workAvailable.tryAcquire())
// waiting for work
try
workAvailable.acquire();
catch (InterruptedException e)
if (!stop)
// log error
void doWork()
void stopWork()
stop = true;
workAvailable.release();
class Producer
/* Must be called after work is added to the queue/made available. */
void signalSomethingToDo()
workAvailable.release();
class BinarySemaphore
private final AtomicBoolean havePermit = new AtomicBoolean();
private final Semaphore sync;
public BinarySemaphore()
this(false);
public BinarySemaphore(boolean fair)
sync = new Semaphore(0, fair);
public boolean release()
boolean released = havePermit.compareAndSet(false, true);
if (released)
sync.release();
return released;
public boolean tryAcquire()
boolean acquired = sync.tryAcquire();
if (acquired)
havePermit.set(false);
return acquired;
public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException
boolean acquired = sync.tryAcquire(timeout, tunit);
if (acquired)
havePermit.set(false);
return acquired;
public void acquire() throws InterruptedException
sync.acquire();
havePermit.set(false);
public void acquireUninterruptibly()
sync.acquireUninterruptibly();
havePermit.set(false);
【讨论】:
以上是关于Java - 多队列生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章
Java并发多线程编程——生产者消费者模式示例(阻塞队列版本)