线程池队列--ArrayBlockingQueue详解

Posted 逃逸者 Gentle

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池队列--ArrayBlockingQueue详解相关的知识,希望对你有一定的参考价值。

前面介绍已经介绍了线程池的三种队列了,剩下要学习的队列已经没多少了,今天我们继续来学习一下另一个队列 ArrayBlockingQueue ,这个队列很简单,下面我们来看一下类图。

我们先来知道 ArrayBlockingQueue 是 BlockingQueue 的实现类,那我们需要先看看 BlockingQueue 提供了哪些方法。

public interface BlockingQueue<E> extends Queue<E> {
//将对象塞入队列,如果塞入成功返回true, 否则返回false。
boolean add(E e);

//将对象塞入到队列中,如果设置成功返回true, 否则返回false
boolean offer(E e);

//将元素塞入到队列中,如果队列中已经满了,
//则该方法会一直阻塞,直到队列中有多余的空间。
void put(E e) throws InterruptedException;

//将对象塞入队列并设置时间
//如果塞入成功返回 true, 否则返回 false.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//从队列中取对象,如果队列中没有对象,
//线程会一直阻塞,直到队列中有对象,并且该方法取得了该对象。
E take() throws InterruptedException;

//在给定的时间里,从队列中获取对象,
//时间到了直接调用普通的poll方法,为null则直接返回null。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//获取队列中剩余长度。
int remainingCapacity();

//从队列中移除指定的值。
boolean remove(Object o);

//判断队列中包含该对象。
public boolean contains(Object o);

//将队列中对象,全部移除,并加到传入集合中。
int drainTo(Collection<? super E> c);

//指定最多数量限制将队列中对,全部移除,并家到传入的集合中。
int drainTo(Collection<? super E> c, int maxElements);
}

下面我们再来看看该类的构造方法:

  /**
* capacity 表示数组中最大容量,默认使用非公平锁
*/

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* capacity 表示数组中最大容量
* fair 为 false 时使用非公平锁,true 时使用公平锁
*/

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

/**
* capacity 表示数组中最大容量
* fair 为 false 时使用非公平锁,true 时使用公平锁
* c 初始化时,可以加入将我们有的集合加入该队列中
*/

public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e); //判空
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

大致了解完上面的方法后,我们再来看看几个参数

	/** 队列中存放的值 */
final Object[] items;

/** 值的索引,这是取出位置的索引*/
int takeIndex;

/** 值的索引,这是插入位置的索引*/
int putIndex;

/** 队列中有多少个元素 */
int count;
  • items: 字段存储的是我们放入数组中的值

  • takeIndex:从数组中某个值的所在位置的索引

  • putIndex:放入数组中某个值所在位置的索引

  • count:表示数组中有多少个值


核心方法
这里仅介绍会阻塞的 take 和 put 方法,其他方法和前面讲的队列类似。

put 入队方法:

 public void put(E e) throws InterruptedException {
//判空
checkNotNull(e);
//显示锁
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//判断队列元素是否以及满了,满了就阻塞
while (count == items.length)
notFull.await();
//入队方法
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}

上面就是阻塞入队操作的实现了,方法很简单,一看就懂。下面是具体入队操作,也没有太难理解的代码。但有一点需要注意。如下代码中,当插入值时发现这个值是队列最后一个位置,会将插入队列的下一个入队位置的索引设置为 0。这样一做,就变成了环形队列的意思了。

/** 变成环形队列 */
if (++putIndex == items.length)
   putIndex = 0;

 private void enqueue(E x) {
//拿到当前队列数组
final Object[] items = this.items;
//将值放入队列
items[putIndex] = x;
//判断队列是否以及满了
if (++putIndex == items.length)
//满了就将下一个入队索引设置为 0
putIndex = 0;
//队列中数量 +1
count++;
//唤醒其他阻塞的出队操作
notEmpty.signal();
}

下面我们来构造一个长度为 6 的环形数组,如下图是初始化情况:

刚刚开始构造完毕时,插入索引和取出索引都在同一个位置,数组内值为空。
线程池队列--ArrayBlockingQueue详解
接着,我们将元素 A 进行入队操作。元素 A 入队后,取出值的索引不会变化,但是入队索引会偏移到下一个位置,如下图所示:
线程池队列--ArrayBlockingQueue详解
我们继续进行入队操作,直到 putIndex 索引指向最后一个位置,如下图所示:
线程池队列--ArrayBlockingQueue详解
这时,我们将元素 F 进行入队,我们会看到,putIndex 索引和 takeIndex 索引,又到回了初始时的位置,这就是上面说的环形队列,此时如果再有线程进行入队操作时,线程便会堵塞,直到元素被消费。
线程池队列--ArrayBlockingQueue详解

说完入队的操作情况,我们在来看看出队操作是如何实现的

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//判断队列是否为空
while (count == 0)
//线程阻塞
notEmpty.await();
//返回出队的值
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}

出队操作实现:

 private E dequeue() {
//取得当前对象
final Object[] items = this.items;
//拿出队列中的值
E x = (E) items[takeIndex];
//将该出队索引指向位置的值置空
items[takeIndex] = null;
//如果出的是最后一个元素
if (++takeIndex == items.length)
//又回到原来的点
takeIndex = 0;
//出队值索引指向下一个位置
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

下面我们用图解的形式来看看出队操作是如何完成的:

初始化情况是队列满的情况。
线程池队列--ArrayBlockingQueue详解
这时来了一个线程,将队列中第一个元素取走。此时,队列中 putIndex 索引没有变化,而 takeIndex 索引指向了下一个位置。
线程池队列--ArrayBlockingQueue详解
假设又来一条线程执行取出值操作,takeIdex 索引将会指向下一个位置。
线程池队列--ArrayBlockingQueue详解
我们继续进行出队操作,直到 takeIndex 索引指向最后一个位置,如下图所示:
线程池队列--ArrayBlockingQueue详解
再次进行取出时,我们可以看到似乎有一次回到了原点。这就环形队列队列。

额外:
上面就是介绍了环形阻塞队列的具体实现,总体来说没太大难度。但是这里需要补充一下代码设计的小细节,先看看下面两个代码小细节。

while (count == 0)
  notEmpty.await();

while (count == items.length)
   notFull.await();

可以看出主要用 while 循环来进行判断。既然是判断,那我们也可以使用 if 来进行判断。按正常来说使用 if 也没有问题。

但是这里不行,这里我们需要知道一个多线程的设计模式 –Guarded Suspension(保护性暂挂模式)

这种设计模式具体我还没学完,不过可以这样理解:假设我们使用 if 语句,那线程到此时被挂起,在被唤醒之后会继续往下执行,这是允许的。而 while 的话,被唤醒依旧在循环中,只有不满足的条件下才会继续向下执行。

以上是关于线程池队列--ArrayBlockingQueue详解的主要内容,如果未能解决你的问题,请参考以下文章

线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue

线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue

阻塞队列和线程池

如果你提交任务时,线程池队列已满,这时会发生什么?

如果你提交任务时,线程池队列已满,这时会发生什么?

4种线程池和7种并发队列