阻塞队列和生产者-消费者模式

Posted longfurcat

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞队列和生产者-消费者模式相关的知识,希望对你有一定的参考价值。

何为阻塞队列,其与普通队列有何差别?

  总的来说,就是能够在适当的时候阻塞"存"和"取"两个操作,以达到控制任务流程的效果。阻塞队列提供了可阻塞的put和take方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。

阻塞队列接口及实现来自于Java并发包(java.util.concurrent),常见的实现有LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue

 

生产者-消费者模式

  生产者-消费者模式是非常常见的设计模式。该模式将"找出需要完成的工作"与"执行工作"这两个过程分离开来,并把工作项放入一个"待完成"列表中以便在随后处理,而不是找出后立即处理。生产者-消费者模式能简化开发过程,因为它消除了生产者类与消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。

 技术分享图片

 

阻塞队列对于生产者-消费者模式有何裨益?

  生产者-消费者模式都是基于队列的。就说说普通的有界队列存在的问题吧,队列存在"满"和"空"的问题,如果队列已满,那生产者继续往队列里存数据就会出问题,存不进去要如何处理,生产者代码中就要有相应的处理代码。同样的,如果队列为空,消费者取不到数据又要如何反应。而阻塞队列,就可以在"存不进"和"取不出"的时候,直接阻塞操作,生产者和消费者代码直接阻塞在存取操作上。当然这种阻塞并不是永久的,就拿生产者来说吧,如果因为"存不进"而阻塞的话,只要消费者取出数据,便会"通知"生产者就能继续生产并存储数据。这样就能极大地简化生产者-消费者的编码。

  值得一提的是,阻塞队列还能提供更灵活的选项:offer(对应put)和 poll(对应take)

boolean offer(E e);

boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

  如果数据项不能被添加到队列中,将返回一个失败状态。而不必一直阻塞下去。这样你就可以选择让生产者做点其他的事。但是一般情况下,如果队列充满,很有可能是因为

V生>V消,以至于数据项囤积,如果任其阻塞,则生产者可能被长时间搁置,浪费资源,利用率降低。这时候就要使用一些灵活的策略进行调控,例如减去负载,将多余的工作项序列化并写入磁盘,减少生产者线程的数量,或者通过某种方式来抑制生产者线程。

 

使用示例

示例说明:本示例模拟一个生产-消费环境,工厂生产可乐,肥宅消费。这里对于生产者的调控比较粗暴,直接新建或中断一个生产者任务。

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<CocaCola> queue = new ArrayBlockingQueue<>(100); //容量100的队列
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            exec.execute(new Producer(queue, exec));
        }
        TimeUnit.SECONDS.sleep(3); //先生产一点库存
        for (int i = 0; i < 5; i++) {
            exec.execute(new FatIndoorsman(queue, exec));
        }
    }
}

class CocaCola { //可口可乐

}

class Producer implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private static List<Producer> producers = new ArrayList<>(); //类管理其实例列表
    private Executor exec;
    private BlockingQueue queue;

    public Producer(BlockingQueue queue, Executor exec) {
        this.queue = queue;
        this.exec = exec;
        producers.add(this);
    }

    public synchronized static void adjust(int flag, BlockingQueue queue, Executor exec) { // 1 添加  -1减少
        if (flag == 1) {
            Producer producer = new Producer(queue, exec); //添加的生产者共享同一个队列
            exec.execute(producer);
        } else if (flag == -1) {
            Producer producer = producers.remove(0);
            producer.cancel();
        }
    }

    private void cancel() { //利用中断取消生产任务
        Thread.currentThread().interrupt();
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                TimeUnit.SECONDS.sleep(1); //模拟生产需耗时1秒
                boolean success = queue.offer(new CocaCola()); //通过offer尝试添加
                if (!success) { //如果队列已满,则移除1个生产者
                    System.out.println("remove a producer");
                    adjust(-1, queue, exec);
                }
                System.out.println(this + " produced a coca-cola!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(this + " is stoped!");
    }

    @Override
    public String toString() {
        return "Producer[" + id + "]";
    }
}

class FatIndoorsman implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private BlockingQueue queue;
    private Executor exec;

    public FatIndoorsman(BlockingQueue queue, Executor exec) {
        this.queue = queue;
        this.exec = exec;
    }

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            CocaCola cocaCola = (CocaCola) queue.poll();
            if (cocaCola != null) {
                try {
                    TimeUnit.SECONDS.sleep(10); //模拟肥宅每隔10秒要喝一瓶
                    System.out.println(this + " drink a coca-cola");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Producer.adjust(1, queue, exec); //添加生产者
            }
        }
    }

    @Override
    public String toString() {
        return "FatIndoorsman[" + id + "]";
    }
}

 

 

 

阻塞队列是如何实现的,即如何进行阻塞?

  那ArrayBlockingQueue来说,其他实现类实现阻塞的方式应该类似。ArrayBlockingQueue用两个Condition对象来控制take和put操作的访问。如果take和put某一方失败,则调用对应Condition的await方法,进行阻塞。如果某一方成功(即成功调用dequeue或enqueue方法),则释放信号及时通知对方。

以下是ArrayBlockingQueue.java的部分源码截图

技术分享图片

技术分享图片

 

技术分享图片

技术分享图片

 

技术分享图片

 

以上是关于阻塞队列和生产者-消费者模式的主要内容,如果未能解决你的问题,请参考以下文章

通过阻塞队列实现生产者和消费者异步解耦

用阻塞队列实现生产者消费者模式一(单线程消费)

生产者消费者模式

用阻塞队列和线程池简单实现生产者和消费者场景

生产者/消费者模式(阻塞队列)

阻塞队列