Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析相关的知识,希望对你有一定的参考价值。
概述
Java Review - 并发编程_LinkedBlockingQueue原理&源码剖析
介绍了使用有界链表方式实现的阻塞队列LinkedBlockingQueue,这里我们继续来研究使用有界数组方式实现的阻塞队列ArrayBlockingQueue的原理。
类图结构
由该图可以看出,ArrayBlockingQueue
- 内部有一个数组items,用来存放队列元素
- putindex变量表示入队元素下标
- takeIndex是出队下标
- count统计队列元素个数
从定义可知,这些变量并没有使用volatile修饰,这是因为访问这些变量都是在锁块内,而加锁已经保证了锁块内变量的内存可见性了。
另外有个独占锁lock用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作。
另外,notEmpty、notFull条件变量用来进行出、入队的同步。
构造函数
ArrayBlockingQueue是有界队列,所以构造函数必须传入队列大小参数。
构造函数的代码如下。
public ArrayBlockingQueue(int capacity)
this(capacity, false);
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try
int i = 0;
try
for (E e : c)
checkNotNull(e);
items[i++] = e;
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
count = i;
putIndex = (i == capacity) ? 0 : i;
finally
lock.unlock();
由以上代码可知,在默认情况下使用ReentrantLock提供的非公平独占锁进行出、入队操作的同步。
主要方法源码解析
研究过LinkedBlockingQueue的实现后再看ArrayBlockingQueue的实现会感觉后者简单了很多
offer操作
向队列尾部插入一个元素,如果队列有空闲空间则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false。
如果e元素为null则抛出NullPointerException异常。
另外,该方法是不阻塞的。
public boolean offer(E e)
// 1
checkNotNull(e);
// 2
final ReentrantLock lock = this.lock;
lock.lock();
try
// 3
if (count == items.length)
return false;
else
// 4
enqueue(e);
return true;
finally
lock.unlock();
- 代码(1) 如果e元素为null则抛出NullPointerException异常
- 代码(2)获取独占锁,当前线程获取该锁后,其他入队和出队操作的线程都会被阻塞挂起而后被放入lock锁的AQS阻塞队列。
- 代码(3)判断如果队列满则直接返回false,否则调用enqueue方法后返回true,enqueue的代码如下
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x)
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 6 元素入队
final Object[] items = this.items;
items[putIndex] = x;
// 7 计算下一个元素应该存放的下标位置
if (++putIndex == items.length)
putIndex = 0;
count++;
// 8
notEmpty.signal();
如上代码首先把当前元素放入items数组,然后计算下一个元素应该存放的下标位置,并递增元素个数计数器,最后激活notEmpty的条件队列中因为调用take操作而被阻塞的一个线程。
这里由于在操作共享变量count前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主内存获取的,而不是从CPU缓存或者寄存器获取。
代码(5)释放锁,然后会把修改的共享变量值(比如count的值)刷新回主内存中,这样其他线程通过加锁再次读取这些共享变量时,就可以看到最新的值。
put操作
向队列尾部插入一个元素,如果队列有空闲则插入后直接返回true,如果队列已满则阻塞当前线程直到队列有空闲并插入成功后返回true,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。
另外,如果e元素为null则抛出NullPointerException异常。
public void put(E e) throws InterruptedException
// 1
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 2 获取锁 可被中断
lock.lockInterruptibly();
try
// 3 如果队列满,这把当前下层放入notFull管理的条件队列
while (count == items.length)
notFull.await();
// 4 插入队列
enqueue(e);
finally
// 5
lock.unlock();
-
在代码(2)中,在获取锁的过程中当前线程被其他线程中断了,则当前线程会抛出InterruptedException异常而退出。
-
代码(3)判断如果当前队列已满,则把当前线程阻塞挂起后放入notFull的条件队列,注意这里也是使用了while循环而不是if语句。\\
-
代码(4)判断如果队列不满则插入当前元素,此处不再赘述。
poll操作
从队列头部获取并移除一个元素,如果队列为空则返回null,该方法是不阻塞的。
public E poll()
// 1
final ReentrantLock lock = this.lock;
lock.lock();
try
// 2
return (count == 0) ? null : dequeue();
finally
lock.unlock();
-
代码(1)获取独占锁。
-
代码(2)判断如果队列为空则返回null,否则调用dequeue()方法
private E dequeue()
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 4 获取元素
E x = (E) items[takeIndex];
// 5 数组中的值为null
items[takeIndex] = null;
// 6 对头指针计算,队列元素个数减一
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 7 发送信号激活notFull条件队列中的一个线程
notFull.signal();
return x;
由以上代码可知,首先获取当前队头元素并将其保存到局部变量,然后重置队头元素为null,并重新设置队头下标,递减元素计数器,最后发送信号激活notFull的条件队列里面一个因为调用put方法而被阻塞的线程。
take操作
获取当前队列头部元素并从队列里面移除它。如果队列为空则阻塞当前线程直到队列不为空然后返回元素,如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。
public E take() throws InterruptedException
// 1
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
// 2 队列为空则等待,直到队列中有数据
while (count == 0)
notEmpty.await();
// 3 获取头部元素
return dequeue();
finally
// 4
lock.unlock();
take操作的代码也比较简单,与poll相比只是代码(2)不同。
在这里,如果队列为空则把当前线程挂起后放入notEmpty的条件队列,等其他线程调用notEmpty.signal()方法后再返回。
需要注意的是,这里也是使用while循环进行检测并等待而不是使用if语句。
peek操作
获取队列头部元素但是不从队列里面移除它,如果队列为空则返回null,该方法是不阻塞的。
public E peek()
// 1
final ReentrantLock lock = this.lock;
lock.lock();
try
// 2
return itemAt(takeIndex); // null when queue is empty
finally
// 3
lock.unlock();
final E itemAt(int i)
return (E) items[i];
peek的实现更简单,首先获取独占锁,然后从数组items中获取当前队头下标的值并返回,在返回前释放获取的锁。
size
计算当前队列元素个数。
public int size()
final ReentrantLock lock = this.lock;
lock.lock();
try
return count;
finally
lock.unlock();
size操作比较简单,获取锁后直接返回count,并在返回前释放锁。
也许你会问,这里又没有修改count的值,只是简单地获取,为何要加锁呢?
其实如果count被声明为volatile的这里就不需要加锁了,因为volatile类型的变量保证了内存的可见性,而ArrayBlockingQueue中的count并没有被声明为volatile的,这是因为count操作都是在获取锁后进行的,
而获取锁的语义之一是,获取锁后访问的变量都是从主内存获取的,这保证了变量的内存可见性。
小结
-
ArrayBlockingQueue通过使用全局独占锁实现了同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似于在方法上添加synchronized的意思。
-
其中offer和poll操作通过简单的加锁进行入队、出队操作,
-
而put、take操作则使用条件变量实现了,如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。
-
另外,相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。
以上是关于Java Review - 并发编程_ArrayBlockingQueue原理&源码剖析的主要内容,如果未能解决你的问题,请参考以下文章