通过阻塞队列实现生产者和消费者异步解耦

Posted zhangfengshi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过阻塞队列实现生产者和消费者异步解耦相关的知识,希望对你有一定的参考价值。

生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。这篇文章我们来看看什么是生产者消费者模式,这个问题也是多线程面试题中经常被提及的。如何使用阻塞队列(Blocking Queue)解决生产者消费者模式,以及使用生产者消费者模式的好处。

 

真实世界中的生产者消费者模式

 

生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系。比如一个人正在准备食物(生产者),而另一个人正在吃(消费者),他们使用一个共用的桌子用于放置盘子和取走盘子,生产者准备食物,如果桌子上已经满了就等待,消费者(那个吃的)等待如果桌子空了的话。这里桌子就是一个共享的对象。在Java Executor框架自身实现了生产者消费者模式它们分别负责添加和执行任务。

 

生产者消费者模式的好处

 

它的确是一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:

 

  1. 它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁

  2. 生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样

  3. 生产者和消费者可以以不同的速度执行

  4. 分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码

 

多线程中的生产者消费者问题

 

生产者消费者问题是一个流行的面试题,面试官会要求你实现生产者消费者设计模式,以至于能让生产者应等待如果队列或篮子满了的话,消费者等待如果队列或者篮子是空的。这个问题可以用不同的方式来现实,经典的方法是使用wait和notify方法在生产者和消费者线程中合作,在队列满了或者队列是空的条件下阻塞,Java5的阻塞队列(BlockingQueue)数据结构更简单,因为它隐含的提供了这些控制,现在你不需要使用wait和nofity在生产者和消费者之间通信了,阻塞队列的put()方法将阻塞如果队列满了,队列take()方法将阻塞如果队列是空的。在下部分我们可以看到代码例子。

先回顾一下知识点阻塞队列BlockingQueue

技术分享图片

常用实现队列简介:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

下面是实现代码 目的是解决生产者和消费者的异步解耦 完全透明

技术分享图片
public class Producer implements Runnable {

    BlockingQueue blockingQueue = null;

    public Producer(BlockingQueue blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {

        for (int i = 0; i < 5; i++) {

            try {
                System.out.println("producer:" + i);
                blockingQueue.put(i);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

}
View Code

 

 

下面是消费者:

技术分享图片
public class Customer implements Runnable {

    BlockingQueue blockingQueue = null;

    public Customer(BlockingQueue blockingQueue) {

        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        while (true) {
            try {
                System.out.println("customer:" + blockingQueue.take());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }

}
View Code

 

 

客户单demo

public class ClientDemo {

    public static void main(String[] args) {

        BlockingQueue blockingQueue = new LinkedBlockingQueue<>();

        Producer producer = new Producer(blockingQueue);
        Customer cusThread = new Customer(blockingQueue);
        Thread producerThread = new Thread(producer);
        Thread customerThread = new Thread(cusThread);

        producerThread.start();

        customerThread.start();
    }

}

需要解释一下 里面的两个属性:

 

/**
*
BlockingQueue.put方法
public void put(E e)
         throws InterruptedException
将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
 

指定者:
接口 BlockingQueue<E> 中的 put
参数:
e - 要添加的元素
抛出:
InterruptedException - 如果在等待时被中断
NullPointerException - 如果指定元素为 null
 

*/
BlockingQueue.put

=====================

 

BlockingQueue.take
/**

*

take
public E take()
       throws InterruptedException
从接口 BlockingQueue 复制的描述
获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
 

指定者:
接口 BlockingQueue<E> 中的 take
返回:
此队列的头部
抛出:
InterruptedException - 如果在等待时被中断
*/

 blockingQueue.take();



  public E take() throws InterruptedException {

        E x;

        int c = -1;

        final AtomicInteger count = this.count;

        final ReentrantLock takeLock = this.takeLock;

        takeLock.lockInterruptibly();

        try {

            while (count.get() == 0) {

                notEmpty.await();

            }

            x = dequeue();

            c = count.getAndDecrement();

            if (c > 1)

                notEmpty.signal();

        } finally {

            takeLock.unlock();

        }

        if (c == capacity)

            signalNotFull();

        return x;

    }

 

 

需要注意的是  put  和take都是阻塞的  在高并发场景下 不利于吞吐率的提升 



以上是关于通过阻塞队列实现生产者和消费者异步解耦的主要内容,如果未能解决你的问题,请参考以下文章

用阻塞队列实现生产者消费者模式一(单线程消费)

阻塞队列LinkedBlockQueue

Java之阻塞队列深度剖析

分布式系统--封装Redis消息队列--消息队列下的异步场景

分布式系统--封装Redis消息队列--消息队列下的异步场景

JavaWeb 基础知识 --多线程(阻塞队列+生产消费者模型)