并发编程中的阻塞队列概述
Posted gocode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程中的阻塞队列概述相关的知识,希望对你有一定的参考价值。
1.简介
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。 阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时,插入、移除这两个附加操作提供4种处理方式:
4种处理试的说明:
抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如 果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
注意:如果有无界阻塞队列,队列永不会满,put或offer方法永不会被阻塞,使用offer方法永远返回true.
2.阻塞队列分类
主要的几种阻塞队列
1)普通阻塞队列:ArrayBlockingQueue,基于数组的有界阻塞队列 ;LinkedBlockingQueue ,基于单向链表的有界阻塞队列;LinkedBlockingDeque, 基于双向链表的有界双向阻塞队列。
2)优先级阻塞队列:PriorityBlockingQueue, 基于优先级排序无界阻塞队列。
3)延时阻塞队列:DelayQueue,基于优先级队列实现的无界阻塞队列。
4)其他阻塞队列:SynchronousQueu,不存储元素的阻塞队列;LinkedTransferQueue,基于单向链表组成的无界阻塞队列。
阻塞队列实现的基本原理:
阻塞队列主要是利用并发编程中的“等待/通知”模型,使用显式锁ReentrantLock和条件ConditionObject实现的。如下自定义的阻塞队列BoundBlockQueue,它使用一把锁和两个条件来实现阻塞队列。一个把锁lock,可以保护所有的访问,一个条件notEmpty用来通知线程当前队列“不满”,另一个条件notFull用来通知线程当前队列“不空”。上面介绍的7种阻塞队列其实现原理大致与此类似。
package thread; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundBlockQueue<T> { private int addIndex = 0; private int removeIndex = 0; private int count = 0; private final Object[] items; private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); public BoundBlockQueue(int size) { if (size < 0) throw new IllegalArgumentException("size must large than zero"); this.items = new Object[size]; } public void add(T o) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[addIndex++] = o; count++; if (addIndex == items.length) addIndex = 0; notEmpty.signal(); } finally { lock.unlock(); } } public T remove() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object o = items[removeIndex]; items[removeIndex++] = null; count--; if (removeIndex == items.length) removeIndex = 0; notFull.signal(); return (T) o; } finally { lock.unlock(); } } public static void main(String[] args) { BoundBlockQueue<String> queue = new BoundBlockQueue<>(9); /* new Thread(() -> { try { int i = 0; while(i<=5){ queue.add(String.valueOf(i)); System.out.println("add-digital: " + i); Thread.sleep(1000); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ new Thread(() -> { try { int i = 1; while (i <= 10) { queue.add("0" + i); System.out.println("add-digital: 0" + i); Thread.sleep(500); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }, "addItemThread1").start(); new Thread(() -> { try { int i = 0; while (i <= 8) { String s = queue.remove(); System.out.println("remove-digital:" + s); Thread.sleep(1000); i++; } } catch (InterruptedException e) { e.printStackTrace(); } }, "removeItemThread1").start(); } }
3.阻塞队列详述
1)普通阻塞队列
ArrayBlockingQueue和LinkedBlockingQueue都实现了Queue接口,表示先进先出的队列,头出尾进,而LinkedBlockingDeque实现了Deque接口,它表示一个双端队列,头尾均可进出。
这三个阻塞队列内部都是使用显式锁ReentrantLock和显式条件CoditionObject实现的。
(1)ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列(构造方法必须指定容量) ,创建时指定容量大小,且运行时容量也不会变化。默认情况下不保证线程公平地访问队列。此队列按照先进先出(FIFO)的原则对元素进行排序。队列的容量和公平性选择均在构造方法中设定。
如下便创建了一个容量为50的公平阻塞队列。
ArrayBlockingQueue e = new ArrayBlockingQueue(50,true);
ArrayBlockingQueue的实现比较简单,其内部有一个数组存储元素,有两个索引分别表示头和尾,有一个变量表示当前元素个数,有一个锁保护所有的访问,有“不满‘和”不空“两个条件处理线程协作问题。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; //.....省略 }
(2)LinkedBlockingQueue
LinkedBlockingQueue是基于单向链表实现的,在创建是可指定最大长度,也可不指定,默认是无限大。LinkedBlockingQueue不支锁的公平性选择,只支持非公平锁。
LinkedBlockingQueue queue=new LinkedBlockingQueue();
LinkedBlockingQueue queue2=new LinkedBlockingQueue(20);
LinkedBlockingQueue的实现与ArrayBlockingQueue有些不同,它是单向链表结构,尾进头出,为提高性能它将锁的粒度细化了,使用了两个锁,一个保护头部、一个保护尾部,每个锁绑定一个条件。
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock();//保护头部(出队)的锁 private final Condition notEmpty = takeLock.newCondition();//"不空"条件 private final ReentrantLock putLock = new ReentrantLock();//保护尾部(入队)的锁 private final Condition notFull = putLock.newCondition();//“不满”条件 //......省略 }
(3)LinkedBlockingDeque
LinkedBlockingDeque,基于双向链表实现,其最大长度可也是可选的,默认无限大。LinkedBlockingDeque不支锁的公平性选择,只支持非公平锁。
LinkedBlockingDeque deque = new LinkedBlockingDeque();
LinkedBlockingDeque deque2 = new LinkedBlockingDeque(20);
LinkedBlockingDeque,与ArrayBlockingQueue类似,也是使用一个锁,两个条件,使用锁保护所有操作,使用“不满‘和”不空“两个条件进行线程协作。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable { transient Node<E> first; transient Node<E> last; private transient int count; private final int capacity; final ReentrantLock lock = new ReentrantLock(); //重入锁 private final Condition notEmpty = lock.newCondition();//"非空"条件 private final Condition notFull = lock.newCondition();//“非满”条件 //......省略 }
LinkedBlockingDeque是双向队列,它可以在队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。因其有两个插入/移除的入口,双向阻塞队列常用在Jork/Join框架的“工作窃取”模式中。
2)优先级阻塞队列
ProrityBlockingQueue是一个支持优先级的无界阻塞队列,(内部元素排列不是完全有序)它是按照优先级出队的(不像普通阻塞队列那样先进先出)。其内部使用数组来存储元素(逻辑上是堆,物理上是数组),它是无界的,数组的长度会动态扩展。它要求元素类型实现了Comparable接口或在构造方法中传入一个Comparator比较器,另外它不能保证同优先级元素的顺序。
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 5595510919245408276L; private transient Object[] queue; //保存元素的数组 private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; //锁 private final Condition notEmpty;//“不空”的条件 private transient volatile int allocationSpinLock; private PriorityQueue<E> q;//只有序列化/反序列化会用到,主要目的是与以前版本兼容。 //......省略 }
ProrityBlockingQueue不支锁的公平性选择,只支持非公平锁。ProrityBlockingQueue的实现,它有一个数组保存元素,它使用一把锁保护所有的操作,(因它是无界的,永不会“满”)只使用一个“不空”的条件进行线程协作。
3)延时阻塞队列
DelayQueue是一个支持延时获取元素的无界阻塞队列。它内部使用一个优先级队列来储存元素,使用一把锁来保护数据,用一个“可获取”的条件表示头部是否有元素可获取,当头部元素的延时未到时,take操作会根据延时计算需要休眠的时间,然后休眠,若此进程中有新的元素入队,且成为头部元素,则部位皮肤休眠的线程会被提前唤醒然后重新检查延时。DelayQueue不支锁的公平性选择,只支持非公平锁。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader = null; private final Condition available = lock.newCondition(); //.....省略 }
它要求元素类型要实现Delay接口,而Delay接口双继承Comparable接口,那么DelayQueue中的每一个元素都是可比较的,额外的getDelay方法返回一个再延迟多长时间的整数(小于等于零就不再延迟)。
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit); }
DelayQueue常用于定时任务,因为DelayQueue是按照延时时间出队的。元素只有在过期后才能出队列中拿走,若没过期就要阻塞等待。
4)其他阻塞队列
SynchronousQueue和LinkedTransferQueue是两个特殊的阻塞队列。
(1)SynchronousQueue
SynchronousQueue,与其他队列不同,它没有存储元素的空间,它的每入队操作必须等待另一个线程的出队操作,反之亦然。
它支持公平访问队列(通过构造方法的参数指定),默认情况下线程采用非公平性策略访问队列
SynchronousQueue适用于两个线程之间直接传递信息、事件或任务。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
(2)LinkedTransferQueue
LinkedTransferQueue实现了TransferQueue接口,TransferQueue扩展了BlockingQueue接口,增加了一此其他功能.生产者在往队列中放元素时,可以等待消费者接收后再返回,适用于消息传递类型的应用。
public interface TransferQueue<E> extends BlockingQueue<E> { //如果有消费者在等待,直接转给消费者,返回true,否则返回false,不入队 boolean tryTransfer(E e); //如果有消费者在等待,直接转给消费者,否则入队,阻塞等待直到被消费者接口后再返回 void transfer(E e) throws InterruptedException; //如果有消费者在等待,直接转给消费者,返回true,否则入队,阻塞等待限定时间,若最后被消费者接收,返回true boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; //是否有消费者在等待 boolean hasWaitingConsumer(); //在等待的消费者个数 int getWaitingConsumerCount(); }
LinkedTransferQueue,与其他阻塞队列相比,主要是多了transfer系列方法, 且这几个方法核心逻辑都是委托给xfer方法实现的。
public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; }
transfer(E)方法: 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回.
tryTransfer(E)方法 : 与transfer类似,只这里只是“尝试”,若没有消费者在等待,它不会阻塞,会直接返回。
transfer(E,long,TimeUnit)方法: 与transfer类似,只是这里是个限时版本。
以上是关于并发编程中的阻塞队列概述的主要内容,如果未能解决你的问题,请参考以下文章
Java并发多线程编程——阻塞队列(BlockingQueue)
Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析
Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析