ArrayBlockingQueue源码阅读(1.8)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ArrayBlockingQueue源码阅读(1.8)相关的知识,希望对你有一定的参考价值。
ArrayBlockingQueue源码阅读
1、ArrayBlockingQueue类结构
??public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一种实现,要了解它就必须清楚BlockingQueue的相关知识;
2、BlockingQueue接口介绍
??在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!,BlockingQueue的类继承关系如下:
BlockingQueue接口重要方法如下:
- offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程)。
- offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
- put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
- poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败,如果不指定超时时间,在没有数据时立即返回失败。
- take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
- drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
3、源码分析
3.1、类属性查看
/** The queued items */ 以数组作为数据结构
final Object[] items;
/** items index for next take, poll, peek or remove */ 队列中下一个将被取出值的下标
int takeIndex;
/** items index for next put, offer, or add */ 队列中下一个将被放入值的下标
int putIndex;
/** Number of elements in the queue */ 数组元素数量
int count;
/*
* Concurrency control uses the classic two-condition algorithm 使用双条件算法
* found in any textbook.
*/
/** Main lock guarding all access */ 使用重入锁(独占锁)
final ReentrantLock lock;
/** Condition for waiting takes */ take时候用于等待的条件
private final Condition notEmpty;
/** Condition for waiting puts */ put时候用于等待的条件
private final Condition notFull;
3.2、构造函数分析
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity and default access policy.
- @param capacity the capacity of this queue
-
@throws IllegalArgumentException if {@code capacity < 1}
*/public ArrayBlockingQueue(int capacity) { this(capacity, false); //调用public ArrayBlockingQueue(int capacity, boolean fair)构造方法,默认使用非公平锁 }
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity and the specified access policy.
- @param capacity the capacity of this queue
- @param fair if {@code true} then queue accesses for threads blocked
- on insertion or removal, are processed in FIFO order; //如果传入的值为true即公平锁,则需要维护一个有序队列,保证先进先出的原则
- if {@code false} the access order is unspecified.
- @throws IllegalArgumentException if {@code capacity < 1}
*/public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //创建指定容量的数组 lock = new ReentrantLock(fair); //默认使用非公平锁 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/**
- Creates an {@code ArrayBlockingQueue} with the given (fixed)
- capacity, the specified access policy and initially containing the
- elements of the given collection,
- added in traversal order of the collection‘s iterator.
- @param capacity the capacity of this queue
- @param fair if {@code true} then queue accesses for threads blocked
- on insertion or removal, are processed in FIFO order;
- if {@code false} the access order is unspecified.
- @param c the collection of elements to initially contain 使用指定集合初始化队列
- @throws IllegalArgumentException if {@code capacity} is less than
- {@code c.size()}, or less than 1.
- @throws NullPointerException if the specified collection or any
- of its elements are null
*/
//这个构造函数的核心就是c.size()与capacity的大小关系对比了
//如果c.size()>capacity那就会报错,所以在初始化的时候要注意
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair); //先创建指定容量的数组,以便集合中的元素存放
//这种写法我们很常见,使用final表示引用不能改变,但又避免了直接使用成员变量
final ReentrantLock lock = this.lock;
//对队列直接修改操作,需要先获取独占锁
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e; //下标从0开始存放
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i; //将数组元素个数返回给全局变量
putIndex = (i == capacity) ? 0 : i; //修改下一次将被放入值的下标
} finally {
lock.unlock(); //解锁
}
}
以上是关于ArrayBlockingQueue源码阅读(1.8)的主要内容,如果未能解决你的问题,请参考以下文章
源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
J.U.C并发框架源码阅读ArrayBlockingQueue
源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue