并发队列之ArrayBlockingQueue
Posted wyq1995
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发队列之ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。
上一篇我们说了并发队列中的LinkedBlockingQueue队列,这次我们看看ArrayBlockingQueue,看看名字,我们想象一下LinkedList和ArrayList的区别,我们可以知道ArrayBlockingQueue底层肯定是基于数组实现的,这是一个有界数组;
ArrayBlockingQueue其中的组成部分和LinkedBlockingQueue及其相似,也是有两个条件变量,维护阻塞队列,实现了生产消费者模式;
一.简单认识ArrayBlockingQueue
先看看几个常用属性:
//数组用于存放队列元素 final Object[] items; //出队索引 int takeIndex; //入队索引 int putIndex; //队列中元素数量 int count; //独占锁 final ReentrantLock lock; //如果数组中为空,还有线程取数据,就丢到这个条件变量中来阻塞 private final Condition notEmpty; //队列满了,还有线程往数组中添加数据,就把线程丢到这里来阻塞 private final Condition notFull;
由于这是一个有界的数组,我们再看看构造器:
//指定容量,默认是非公平策略 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //指定容量和独占锁的策略 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //可以指定容量,锁的策略,还有初始化数据 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
二.offer方法
向队列尾部添加一个元素,添加成功就返回true,队列满了就丢掉当前元素直接返回false,方法不阻塞;
public boolean offer(E e) { //非空检验 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //如果数组中实际数量和最大容量相等,添加失败,返回false if (count == items.length) return false; else { //添加成功,方法实现在下面 enqueue(e); return true; } } finally { //释放锁 lock.unlock(); } } private void enqueue(E x) { //拿到数组 final Object[] items = this.items; //在putIndex这个位置放入数据x,然后把putIndex加一,说明这个参数表示的是下一个数据要放入的位置的索引 items[putIndex] = x; //这里putIndex是先加一然后再比较是否相等,比如这里数组的最大容量是5,那么索引的最大值应该是4,而如果putIndex等于5了,说明数组 //越界了,加把这个索引重置为0 if (++putIndex == items.length) putIndex = 0; count++; //添加完成之后,说明了数组中有数据了,这里会唤醒之前因为去数组中取数据而阻塞的线程 notEmpty.signal(); }
三.put方法
向队列尾部插入一个元素,队列有空闲就插入成功返回true,队列满了就阻塞当前线程到notFull的条件队列中,等有空闲之后就会被唤醒;阻塞过程中对中断会有响应的;
public void put(E e) throws InterruptedException { //非空检查 checkNotNull(e); final ReentrantLock lock = this.lock; //注意该锁的获取方式 lock.lockInterruptibly(); try { //如果线程满了,就把当前线程放到notFull条件变量的阻塞队列中 while (count == items.length) notFull.await(); //没有满,就添加数据 enqueue(e); } finally { //释放锁 lock.unlock(); } }
四.poll方法
头部获取并移除一个元素,如果队列为空,就返回null,方法不阻塞;
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果队列为空,就返回null //如果队列不为空,就调用dequeue方法获取并删除队列头部的元素 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } private E dequeue() { //获取数组 final Object[] items = this.items; @SuppressWarnings("unchecked") //获取takeIndex位置的元素,最后会将这个返回 E x = (E) items[takeIndex]; //然后将takeInde位置置为空 items[takeIndex] = null; //如果takeIndex已经是数组的最后一个位置了,就将takeIndex重置为0 if (++takeIndex == items.length) takeIndex = 0; //实际数量减一 count--; if (itrs != null) itrs.elementDequeued(); //唤醒notFull中线程 notFull.signal(); return x; }
五.take方法
获取并删除当前队列头部的元素,如果队列为空当前线程阻塞直到被唤醒,对中断有响应;
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //可中断的方式获取锁 lock.lockInterruptibly(); try { //如果数组为空,此时就唤醒notEmpty中条件队列里的线程 while (count == 0) notEmpty.await(); //获取并删除头节点 return dequeue(); } finally { lock.unlock(); } }
六.peek方法
只是获取头部元素,不删除,如果队列为空就返回null,这个方法是线程不阻塞的
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } //获取到数组中索引为takeIndex中的数据 @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; }
七.总结
理解了上一篇博客中说的LinkedBlockingQueue,那么再看这一篇其实太容易了,就是操作数组嘛!用下面这个图表示:
以上是关于并发队列之ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章
深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue