Java中的阻塞队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的阻塞队列相关的知识,希望对你有一定的参考价值。
在集合框架里,想必大家都用过ArrayList和LinkedList,也经常在面试中问到他们之间的区别。ArrayList和ArrayBlockingQueue一样,内部基于数组来存放元素,而LinkedBlockingQueue则和LinkedList一样,内部基于链表来存放元素。
LinkedBlockingQueue实现了BlockingQueue接口,这里放一张类的继承关系图(图片来自之前的文章:说说队列Queue)
LinkedBlockingQueue不同于ArrayBlockingQueue,它如果不指定容量,默认为Integer.MAX_VALUE
,也就是无界队列。所以为了避免队列过大造成机器负载或者内存爆满的情况出现,我们在使用的时候建议手动传一个队列的大小。
2、源码分析
2.1 属性
/** * 节点类,用于存储数据 */ static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } /** 阻塞队列的大小,默认为Integer.MAX_VALUE */ private final int capacity; /** 当前阻塞队列中的元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 阻塞队列的头结点 */ transient Node<E> head; /** * 阻塞队列的尾节点 */ private transient Node<E> last; /** 获取并移除元素时使用的锁,如take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */ private final Condition notEmpty = takeLock.newCondition(); /** 添加元素时使用的锁如 put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */ private final Condition notFull = putLock.newCondition();
从上面的属性我们知道,每个添加到LinkedBlockingQueue队列中的数据都将被封装成Node节点,添加的链表队列中,其中head和last分别指向队列的头结点和尾结点。与ArrayBlockingQueue不同的是,LinkedBlockingQueue内部分别使用了takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,这样也就可以大大提高吞吐量。
这里如果不指定队列的容量大小,也就是使用默认的Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出,这点在使用前希望慎重考虑。
另外,LinkedBlockingQueue对每一个lock锁都提供了一个Condition用来挂起和唤醒其他线程。
构造函数
public LinkedBlockingQueue() { // 默认大小为Integer.MAX_VALUE this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
默认的构造函数和最后一个构造函数创建的队列大小都为Integer.MAX_VALUE,只有第二个构造函数用户可以指定队列的大小。第二个构造函数最后初始化了last和head节点,让它们都指向了一个元素为null的节点。
方法
同样,LinkedBlockingQueue也有着和ArrayBlockingQueue一样的方法,我们先来看看入队列的方法。
2.3.1、入队方法
LinkedBlockingQueue提供了多种入队操作的实现来满足不同情况下的需求,入队操作有如下几种:
- void put(E e);
- boolean offer(E e);
- boolean offer(E e, long timeout, TimeUnit unit)。
put(E e)
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 获取锁中断 putLock.lockInterruptibly(); try { //判断队列是否已满,如果已满阻塞等待 while (count.get() == capacity) { notFull.await(); } // 把node放入队列中 enqueue(node); c = count.getAndIncrement(); // 再次判断队列是否有可用空间,如果有唤醒下一个线程进行添加操作 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // 如果队列中有一条数据,唤醒消费线程进行消费 if (c == 0) signalNotEmpty(); }
小结put方法来看,它总共做了以下情况的考虑:
- 队列已满,阻塞等待。
- 队列未满,创建一个node节点放入队列中,如果放完以后队列还有剩余空间,继续唤醒下一个添加线程进行添加。如果放之前队列中没有元素,放完以后要唤醒消费线程进行消费。
offer(E e)
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { // 队列有可用空间,放入node节点,判断放入元素后是否还有可用空间, // 如果有,唤醒下一个添加线程进行添加操作。 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; }
可以看到offer仅仅对put方法改动了一点点,当队列没有可用元素的时候,不同于put方法的阻塞等待,offer方法直接方法false。
offer(E e, long timeout, TimeUnit unit)
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { // 等待超时时间nanos,超时时间到了返回false while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
该方法只是对offer方法进行了阻塞超时处理,使用了Condition的awaitNanos来进行超时等待,这里为什么要用while循环?因为awaitNanos方法是可中断的,为了防止在等待过程中线程被中断,这里使用while循环进行等待过程中中断的处理,继续等待剩下需等待的时间。
以上是关于Java中的阻塞队列的主要内容,如果未能解决你的问题,请参考以下文章