Java - 多队列生产者消费者

Posted

技术标签:

【中文标题】Java - 多队列生产者消费者【英文标题】:Java - Multiple queue producer consumer 【发布时间】:2014-11-14 18:09:55 【问题描述】:

我有以下代码:

    while(!currentBoard.boardIsValid())
        for (QueueLocation location : QueueLocation.values())
            while(!inbox.isEmpty(location))
                Cell c = inbox.dequeue(location);
                notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
            
        
    

我有一个有几个队列的消费者(他们的所有方法都是同步的)。每个生产者一个队列。消费者循环遍历所有队列并检查他们是否有任务供他消费。 如果他正在检查的队列中有一个任务,他就会使用它。否则,他会去检查下一个队列,直到他完成对所有队列的迭代。

到目前为止,如果他遍历所有队列并且它们都是空的,他会继续循环而不是等待其中一个包含某些内容(如外部 while 所见)。

如何让消费者等到其中一个队列中有东西?

我遇到以下情况的问题:假设只有 2 个队列。消费者检查了第一个,它是空的。就在他检查第二个(也是空的)时,生产者在第一个队列中放了一些东西。就消费者而言,队列都是空的,所以他应该等待(即使其中一个不再是空的,他应该继续循环)。

编辑: 最后一件事。这对我来说是一个练习。我正在尝试自己实现同步。因此,如果任何 java 库都有实现此功能的解决方案,我对此不感兴趣。我试图了解如何实现这一点。

【问题讨论】:

除非您可以修改生产者以执行额外的信号/通知,否则有两种解决方案,1) 使用超时并在 CPU 消耗和响应性之间进行权衡,或者 2) 使用额外的线程在队列中等待并通知消费者。第二种方法的额外线程会消耗更多内存,但不会占用大量 CPU 时间,因为它们大部分时间都在等待。 【参考方案1】:

@Abe 很接近。我会使用信号并等待 - 使用内置的 Object 类,因为它们是最轻的。

Object sync = new Object();  // Can use an existing object if there's an appropriate one

// On submit to queue
synchronized ( sync ) 
    queue.add(...);  // Must be inside to avoid a race condition
    sync.notifyAll();


// On check for work in queue
synchronized ( sync ) 
    item = null;
    while ( item == null ) 
        // Need to check all of the queues - if there will be a large number, this will be slow,
        // and slow critical sections (synchronized blocks) are very bad for performance
        item = getNextQueueItem();
        if ( item == null ) 
            sync.wait();
        
    

注意sync.wait 释放同步锁直到通知 - 同步锁是成功调用等待方法所必需的(这是对程序员的提醒,它确实需要某种类型的临界区才能工作可靠)。

顺便说一句,如果可行的话,我会推荐一个专用于消费者(或一组消费者)的队列,而不是一个专用于生产者的队列。它将简化解决方案。

【讨论】:

【参考方案2】:

如果您想跨多个队列进行阻塞,那么一种选择是使用 java 的Lock and Condition objects and then use the signal method。

所以每当生产者有数据时,它应该调用signallAll

Lock fileLock = new ReentrantLock();
Condition condition = fileLock.newCondition();
...
// producer has to signal
condition.signalAll();
...
// consumer has to await.
condition.await();

只有在提供信号时,消费者才会去检查队列。

【讨论】:

【参考方案3】:

我按照@Abe 的建议解决了类似的情况,但最终决定将SemaphoreAtomicBoolean 结合使用,并将其称为BinarySemaphore。它确实需要对生产者进行修改,以便他们在有事情要做时发出信号。 下面是 BinarySemaphore 的代码和消费者工作循环应该是什么样子的一般概念:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class MultipleProdOneConsumer 

BinarySemaphore workAvailable = new BinarySemaphore();

class Consumer 

    volatile boolean stop;

    void loop() 

        while (!stop) 
            doWork();
            if (!workAvailable.tryAcquire()) 
                // waiting for work
                try 
                    workAvailable.acquire();
                 catch (InterruptedException e) 
                    if (!stop) 
                        // log error
                    
                
            
        
    

    void doWork() 

    void stopWork() 
        stop = true;
        workAvailable.release();
    


class Producer 

    /* Must be called after work is added to the queue/made available. */
    void signalSomethingToDo() 
        workAvailable.release();
    


class BinarySemaphore 

    private final AtomicBoolean havePermit = new AtomicBoolean();
    private final Semaphore sync;

    public BinarySemaphore() 
        this(false);
    

    public BinarySemaphore(boolean fair) 
        sync = new Semaphore(0, fair);
    

    public boolean release() 

        boolean released = havePermit.compareAndSet(false, true);
        if (released) 
            sync.release();
        
        return released;
    

    public boolean tryAcquire() 

        boolean acquired = sync.tryAcquire();
        if (acquired) 
            havePermit.set(false);
        
        return acquired;
    

    public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException 

        boolean acquired = sync.tryAcquire(timeout, tunit);
        if (acquired) 
            havePermit.set(false);
        
        return acquired;
    

    public void acquire() throws InterruptedException 

        sync.acquire();
        havePermit.set(false);
    

    public void acquireUninterruptibly() 

        sync.acquireUninterruptibly();
        havePermit.set(false);
    




【讨论】:

以上是关于Java - 多队列生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程-----实现生产者消费者模式的几种方式

Java并发多线程编程——生产者消费者模式示例(阻塞队列版本)

Java多线程的生产者与消费者模型,线程间的通信

Java多线程15:QueueBlockingQueue以及利用BlockingQueue实现生产者/消费者模型

java多线程之自定义消息队列

一、多线程下生产者消费者模式