ArrayBlockingQueue源码解析
Posted 醉酒的小男人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ArrayBlockingQueue源码解析相关的知识,希望对你有一定的参考价值。
什么是阻塞队列
队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、允许删除的一端称为队头。在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。
常用阻塞队列
ArrayBlockingQueue
一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。
LinkedBlockingQueue
一个由链表结构组成的有界队列,此队列的长度为Integer.MAX_VALUE。此队列按照先进先出的顺序进行排序。
SynchronousQueue
是一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个使用场景是在线程池里。
Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
LinkedTransferQueue
是 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue队列多了transfer和tryTransfer方法。
PriorityBlockingQueue
是一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。
DelayQueue
一个实现PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。
使用场景:
1.定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
2.缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。
常用方法 抛出异常
是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值
插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null。
一直阻塞
当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出
当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
源码分析
常用属性
//元素数组
final Object[] items;
// 取元素的指针
int takeIndex;
// 放元素的指针
int putIndex;
// 元素数量
int count;
//集合锁
final ReentrantLock lock;
//非空条件等待队列
private final Condition notEmpty;
//未满条件等待队列
private final Condition notFull;
代码分析
package com;
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockQueueTest {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
queue.put("aaa");
queue.take();
}
}
构造方法
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化数组容量为capacity
this.items = new Object[capacity];
//初始化锁,true为公平锁,false为非公平锁
lock = new ReentrantLock(fair);
//初始化两个条件队列
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
put方法
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//入队时,先加锁,如果被打断会抛异常出来
lock.lockInterruptibly();
try {
/*
*当队列已经放满时,则把当前线程放入,未满条件等待队列,挂起线程
*当线程被唤醒时,则再去看队列是否放满
*未放满,则元素入队,放满则再继续挂起
*/
while (count == items.length)
notFull.await();
//当队列未满时,则把元素入队,
enqueue(e);
} finally {
//添加完成释放锁
lock.unlock();
}
}
take方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//出队时,先加锁
lock.lockInterruptibly();
try {
/*
*当队列为空时,则把该线程放入非空条件等待队列,挂起该线程
*当线程被唤醒时,则再去看队列是否非空
*有值,则出队,没有值,则再次挂起该线程
*/
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
lock.unlock();
}
}
dequeue方法
private void enqueue(E x) {
final Object[] items = this.items;
//把元素放入索引为‘放指针’的位置
items[putIndex] = x;
//放索引加一,当如果,指针已经到了数组尾部,则指针又回到数组头部,就是重置为0
if (++putIndex == items.length)
putIndex = 0;
//队列元素长度加一
count++;
//当元素已经放入,则通知非空条件队列,唤醒该队列线程可以
notEmpty.signal();
}
涉及技术:
ReentrantLock源码分析
Condition源码分析
总结
该队列使用的是数组来存放数据,如果当数据放满后,使用put方法则,阻塞住该线程,把该线程移入到未满条件队列中,等待取数据后被唤醒;如果取数据时,队列为空,则把该线程放入非空条件队列,等待放数据后被唤醒;因为使用的是数组,用写指针和读指针来标记位置,如果到了队尾,则会重新跳到队头;相当于是一个环;
以上是关于ArrayBlockingQueue源码解析的主要内容,如果未能解决你的问题,请参考以下文章
ArrayBlockingQueue源码解析(基于JDK8)
Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析