通过阻塞队列实现生产者和消费者异步解耦
Posted zhangfengshi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过阻塞队列实现生产者和消费者异步解耦相关的知识,希望对你有一定的参考价值。
生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和消费者可以以不同的速度生产和消费数据。这篇文章我们来看看什么是生产者消费者模式,这个问题也是多线程面试题中经常被提及的。如何使用阻塞队列(Blocking Queue)解决生产者消费者模式,以及使用生产者消费者模式的好处。
真实世界中的生产者消费者模式
生产者和消费者模式在生活当中随处可见,它描述的是协调与协作的关系。比如一个人正在准备食物(生产者),而另一个人正在吃(消费者),他们使用一个共用的桌子用于放置盘子和取走盘子,生产者准备食物,如果桌子上已经满了就等待,消费者(那个吃的)等待如果桌子空了的话。这里桌子就是一个共享的对象。在Java Executor框架自身实现了生产者消费者模式它们分别负责添加和执行任务。
生产者消费者模式的好处
它的确是一种实用的设计模式,常用于编写多线程或并发代码。下面是它的一些优点:
-
它简化的开发,你可以独立地或并发的编写消费者和生产者,它仅仅只需知道共享对象是谁
-
生产者不需要知道谁是消费者或者有多少消费者,对消费者来说也是一样
-
生产者和消费者可以以不同的速度执行
-
分离的消费者和生产者在功能上能写出更简洁、可读、易维护的代码
多线程中的生产者消费者问题
生产者消费者问题是一个流行的面试题,面试官会要求你实现生产者消费者设计模式,以至于能让生产者应等待如果队列或篮子满了的话,消费者等待如果队列或者篮子是空的。这个问题可以用不同的方式来现实,经典的方法是使用wait和notify方法在生产者和消费者线程中合作,在队列满了或者队列是空的条件下阻塞,Java5的阻塞队列(BlockingQueue)数据结构更简单,因为它隐含的提供了这些控制,现在你不需要使用wait和nofity在生产者和消费者之间通信了,阻塞队列的put()方法将阻塞如果队列满了,队列take()方法将阻塞如果队列是空的。在下部分我们可以看到代码例子。
先回顾一下知识点阻塞队列BlockingQueue
常用实现队列简介:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
下面是实现代码 目的是解决生产者和消费者的异步解耦 完全透明
public class Producer implements Runnable { BlockingQueue blockingQueue = null; public Producer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { for (int i = 0; i < 5; i++) { try { System.out.println("producer:" + i); blockingQueue.put(i); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
下面是消费者:
public class Customer implements Runnable { BlockingQueue blockingQueue = null; public Customer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { // TODO Auto-generated method stub while (true) { try { System.out.println("customer:" + blockingQueue.take()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
客户单demo
public class ClientDemo { public static void main(String[] args) { BlockingQueue blockingQueue = new LinkedBlockingQueue<>(); Producer producer = new Producer(blockingQueue); Customer cusThread = new Customer(blockingQueue); Thread producerThread = new Thread(producer); Thread customerThread = new Thread(cusThread); producerThread.start(); customerThread.start(); } }
需要解释一下 里面的两个属性:
/** * BlockingQueue.put方法 public void put(E e) throws InterruptedException 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 指定者: 接口 BlockingQueue<E> 中的 put 参数: e - 要添加的元素 抛出: InterruptedException - 如果在等待时被中断 NullPointerException - 如果指定元素为 null */
BlockingQueue.put
=====================
BlockingQueue.take
/** * take public E take() throws InterruptedException 从接口 BlockingQueue 复制的描述 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 指定者: 接口 BlockingQueue<E> 中的 take 返回: 此队列的头部 抛出: InterruptedException - 如果在等待时被中断 */ blockingQueue.take(); public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
需要注意的是 put 和take都是阻塞的 在高并发场景下 不利于吞吐率的提升
以上是关于通过阻塞队列实现生产者和消费者异步解耦的主要内容,如果未能解决你的问题,请参考以下文章
分布式系统--封装Redis消息队列--消息队列下的异步场景