JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue
Posted 张维鹏
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue相关的知识,希望对你有一定的参考价值。
一、什么是阻塞队列:
阻塞队列最大的特性在于支持阻塞添加和阻塞删除方法:
-
阻塞添加:当阻塞队列已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行加入元素操作。
-
阻塞删除:但阻塞队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作
Java 中的阻塞队列接口 BlockingQueue 继承自 Queue 接口,因此先来看看阻塞队列接口为我们提供的主要方法:
public interface BlockingQueue<E> extends Queue<E> {
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 在成功时返回 true,如果此队列已满,则抛IllegalStateException。
boolean add(E e);
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量)
// 如果该队列已满,则在到达指定的等待时间之前等待可用的空间,该方法可中断
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。
void put(E e) throws InterruptedException;
//获取并移除此队列的头部,如果没有元素则等待(阻塞),直到有元素将唤醒等待线程执行该操作
E take() throws InterruptedException;
//获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//从此队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o);
}
//除了上述方法还有继承自Queue接口的方法
//获取但不移除此队列的头元素,没有则跑异常NoSuchElementException
E element();
//获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek();
//获取并移除此队列的头,如果此队列为空,则返回 null。
E poll();
这里我们把上述操作进行分类:
(1)插入方法:
- add(E e):添加成功返回 true,失败抛 IllegalStateException 异常
- offer(E e):成功返回 true,如果此队列已满,则返回 false
- put(E e):将元素插入此队列的尾部,如果该队列已满,则一直阻塞
(2)删除方法
- remove(Object o):移除指定元素,成功返回 true,失败返回 false
- poll():获取并移除此队列的头元素,若队列为空,则返回 null
- take():获取并移除此队列头元素,若没有元素则一直阻塞
(3)检查方法:
- element() :获取但不移除此队列的头元素,没有元素则抛异常
- peek() :获取但不移除此队列的头;若队列为空,则返回 null
二、阻塞队列的实现原理:
1、ArrayBlockingQueue:
1.1、数据结构:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/**获取数据的索引,主要用于take,poll,peek,remove方法 */
int takeIndex;
/**添加数据的索引,主要用于 put, offer, or add 方法*/
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并非访问的锁 */
final ReentrantLock lock;
/**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
private final Condition notEmpty;
/** notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
}
ArrayBlockingQueue 内部通过数组对象 items 来存储所有的数据,需要注意的是ArrayBlockingQueue 通过一个 ReentrantLock 来同时控制添加线程与移除线程的并发访问,这点与 LinkedBlockingQueue 区别很大(稍后会分析)。而对于 notEmpty 条件对象则是用于存放等待或唤醒调用 take() 方法的线程,告诉他们队列已有元素,可以执行获取操作。同理 notFull 条件对象是用于等待或唤醒调用 put() 方法的线程,告诉它们队列未满,可以执行添加元素的操作。takeIndex 代表的是下一个方法(take,poll,peek,remove)被调用时获取数组元素的索引,putIndex 则代表下一个方法(put, offer, or add)被调用时元素添加到数组中的索引。
1.2、阻塞添加:put()
put() 方法特点是阻塞添加,当队列满时通过条件对象来阻塞当前调用 put() 方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况:一是队列已满,那么新到来的put 线程将添加到 notFull 的条件队列中等待;二是有移除线程执行移除操作,移除成功同时唤醒put线程。
具体代码如下:
//put方法,阻塞时可中断
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//该方法可中断
try {
//当队列元素个数与数组长度相等时,无法添加元素
while (count == items.length)
//将当前调用线程挂起,添加到notFull条件队列中等待唤醒
notFull.await();
enqueue(e);//如果队列没有满直接添加。。
} finally {
lock.unlock();
}
}
//入队操作
private void enqueue(E x) {
//获取当前数组
final Object[] items = this.items;
//通过putIndex索引对数组进行赋值
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;//队列中元素数量加1
//唤醒调用take()方法的线程,执行元素获取操作。
notEmpty.signal();
}
1.3、阻塞删除:take()
take() 方法其实很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty 条件队列等待(有数据就直接取走,方法结束),如果有新的put线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作,图示如下:
具体代码如下:
//从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//中断
try {
//如果队列没有元素
while (count == 0)
//执行阻塞操作
notEmpty.await();
return dequeue();//如果队列有元素执行删除操作
} finally {
lock.unlock();
}
}
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取要删除的对象
E x = (E) items[takeIndex];
将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,
//如果相等说明已到尽头,恢复为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列个数减1
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
//删除了元素说明队列有空位,唤醒notFull条件对象添加线程,执行添加操作
notFull.signal();
return x;
}
2、LinkedBlockingQueue:
LinkedBlockingQueue 是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,但大小默认值为 Integer.MAX_VALUE,建议使用 LinkedBlockingQueue时手动传值,避免队列过大造成机器负载或者内存爆满等情况
2.1、数据结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 节点类,用于存储数据
*/
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
/** 阻塞队列的大小,默认为Integer.MAX_VALUE */
private final int capacity;
/** 当前阻塞队列中的元素个数 */
private final AtomicInteger count = new AtomicInteger();
/** 阻塞队列的头结点 */
transient Node<E> head;
/** 阻塞队列的尾节点 */
private transient Node<E> last;
/** 获取并移除元素时使用的锁,如take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程 */
private final Condition notEmpty = takeLock.newCondition();
/** 添加元素时使用的锁如 put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** notFull条件对象,当队列数据已满时用于挂起执行添加的线程 */
private final Condition notFull = putLock.newCondition();
}
从上述可看成,每个添加到 LinkedBlockingQueue 队列中的数据都将被封装成 Node 节点,添加的链表队列中,其中 head 和 last 分别指向队列的头结点和尾结点。与 ArrayBlockingQueue 不同的是,LinkedBlockingQueue 内部分别使用了 takeLock 和 putLock 对并发进行控制,也就是说,添加和删除操作并不是互斥操作,可以同时进行,可以大大提高吞吐量。这里再次强调如果没有给 LinkedBlockingQueue 指定容量大小,其默认值将是 Integer.MAX_VALUE,如果存在添加速度大于删除速度时候,有可能会内存溢出。至于 LinkedBlockingQueue 的实现原理图与 ArrayBlockingQueue 是类似的,除了对添加和移除方法使用单独的锁控制外,两者都使用了不同的 Condition 条件对象作为等待队列,用于挂起 take 线程和 put 线程。
2.2、阻塞添加:put()
public void put(E e) throws InterruptedException {
//添加元素为null直接抛出异常
if (e == null) throw new NullPointerException();
int c = -1;
//构建节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//获取队列的个数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//判断队列是否已满,如果已满则阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
//添加元素并更新count值
enqueue(node);
c = count.getAndIncrement();
//如果队列容量还没满,唤醒下一个添加线程,执行添加操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//由于存在添加锁和消费锁,而消费锁和添加锁都会持续唤醒等待线程,因此count肯定会变化
//这里的if条件表示如果队列中还有1条数据,由于队列中存在数据那么就唤醒消费锁
if (c == 0)
signalNotEmpty();
}
这里的 put()方法做了两件事,第一件事是判断队列是否满,满了将当前线程加入等下队列,没满就将节点封装成 Node入队,然后再次判断队列添加完成后是否已满,不满就继续唤醒等到在条件对象 notFull 上的添加线程。第二件事是,判断是否需要唤醒等到在 notEmpty 条件对象上的消费线程。这里我们可能会有点疑惑,为什么添加完成后是继续唤醒在条件对象 notFull 上的添加线程而不是像 ArrayBlockingQueue 那样直接唤醒 notEmpty 条件对象上的消费线程?而又为什么要当 if (c == 0) 时才去唤醒消费线程呢?
- (1)唤醒添加线程的原因:在添加新元素完成后,会判断队列是否已满,不满就继续唤醒在条件对象 notFull 上的添加线程,这点与前面分析的 ArrayBlockingQueue 很不相同,在ArrayBlockingQueue 内部完成添加操作后,会直接唤醒消费线程对元素进行获取,这是因为ArrayBlockingQueue 只用了一个 ReenterLock 同时对添加线程和消费线程进行控制,这样如果在添加完成后再次唤醒添加线程的话,消费线程可能永远无法执行,而对于 LinkedBlockingQueue 来说就不一样了,其内部对添加线程和消费线程分别使用了各自的 ReenterLock 锁对并发进行控制,也就是说添加线程和消费线程是不会互斥的,所以添加锁只要管好自己的添加线程即可,添加线程自己直接唤醒自己的其他添加线程,如果没有等待的添加线程,直接结束了。如果有就直到队列元素已满才结束挂起,注意消费线程的执行过程也是如此。这也是为什么 LinkedBlockingQueue 的吞吐量要相对大些的原因。
- (2)为什么 if (c == 0) 时才去唤醒消费线程:这是因为消费线程一旦被唤醒,就一直处于消费的状态,直到队列为空才结束,所以 c 值是一直在变化的(c值是添加完元素前队列的大小),此时 c 只可能是等于0或大于0,如果是 c=0,那么说明之前消费线程已停止,条件对象上可能存在等待的消费线程,添加完数据后应该是 c+1,那么有数据就直接唤醒等待消费线程,如果没有就结束啦,等待下一次的消费操作。如果 c>0 那么消费线程就不会被唤醒,只能等待下一个消费操作(poll、take、remove)的调用,那为什么不是条件 c>0 才去唤醒呢?我们要明白的是消费线程一旦被唤醒会和添加线程一样,一直不断唤醒其他消费线程,如果添加前 c>0,那么很可能上一次调用的消费线程后,数据并没有被消费完,条件队列上也就不存在等待的消费线程了,所以 c>0 唤醒消费线程得意义不是很大,当然如果添加线程一直添加元素,那么一直 c>0,消费线程执行的换就要等待下一次调用消费操作了(poll、take、remove)
2.3、阻塞删除:take()
public E take() throws InterruptedException {
E x;
int c = -1;
//获取当前队列大小
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可中断
try {
//如果队列没有数据,挂机当前线程到条件对象的等待队列中
while (count.get() == 0) {
notEmpty.await();
}
//如果存在数据直接删除并返回该数据
x = dequeue();
c = count.getAndDecrement();//队列大小减1
if (c > 1)
notEmpty.signal();//还有数据就唤醒后续的消费线程
} finally {
takeLock.unlock();
}
//满足条件,唤醒条件对象上等待队列中的添加线程
if (c == capacity)
signalNotFull();
return x;
}
take() 方法是一个可阻塞可中断的移除方法,主要做了两件事,一是,如果队列没有数据就挂起当前线程到 notEmpty 条件对象的等待队列中一直等待,如果有数据就删除节点并返回数据项,同时唤醒后续消费线程,二是尝试唤醒条件对象 notFull 上等待队列中的添加线程。
3、ArrayBlockingQueue 和 LinkedBlockingQueue 迥异:
通过上述的分析,对于 ArrayBlockingQueue 和 LinkedBlockingQueue 的基本使用以及内部实现原理我们已较为熟悉了,这里我们就对它们两间的区别来个小结:
- (1)队列大小有所不同,ArrayBlockingQueue 是有界的初始化必须指定大小,而LinkedBlockingQueue 可以是有界的也可以是无界的(默认是 Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题
- (2)数据存储容器不同,ArrayBlockingQueue 采用的是数组作为数据存储容器,而LinkedBlockingQueue 采用的则是以 Node 节点作为连接对象的链表
- (3)创建与销毁对象的开销不同,ArrayBlockingQueue 采用数组作为存储容器,在插入或删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 则会生成一个额外的 Node 对象。在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
- (4)队列添加或移除的锁不一样,ArrayBlockingQueue 的锁是没有分离的,添加操作和移除操作采用同一个 ReenterLock 锁,而 LinkedBlockingQueue 的锁是分离的,添加采用的是 putLock,移除采用的是 takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
参考文章:https://blog.csdn.net/javazejian/article/details/77410889
以上是关于JUC多线程:阻塞队列ArrayBlockingQueue与LinkedBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章