java同步阻塞队列之ArrayBlockingQueue实现原理
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之ArrayBlockingQueue实现原理相关的知识,希望对你有一定的参考价值。
在java中我们可以用同步阻塞队列实现生产者-消费者模型。
ArrayBlockingQueue
提供了阻塞队列功能,底层数据结构是基于数组,提供如下几个关键方法:
- public boolean add(E e) 向队列中添加元素底层调用的是
offer
方法,如果添加成功返回true,否则抛出IllegalStateException异常 - public boolean offer(E e),向队列中添加元素,如果队列 已经满了,返回false,否则添加元素
- public void put(E e) throws InterruptedException 向队列中添加元素,如果队列满了,会阻塞等待
- public E poll() 从队列头取出元素,如果队列空,不会等待,返回null
- public E take() throws InterruptedException 从队列头取出元素,如果队列为空,会阻塞等待
接下来我们看看ArrayBlockingQueue中这几个方法的实现逻辑,再说实现逻辑之前,先看下其几个属性,
// 底层结构数组,存储数据
final Object[] items;
// 读取数据数组下标
int takeIndex;
// 插入数据数组下标
int putIndex;
// 当前队列中数组元素个数
int count;
// 锁
final ReentrantLock lock;
// 条件队列
private final Condition notEmpty;
private final Condition notFull;
// 初始的时候传入容量,也可以指定锁是公平锁还是非公平锁,默认是非公平锁
public ArrayBlockingQueue(int capacity)
this(capacity, false);
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
另外,往队列的数组里面添加元素通过enqueue
实现,获取元素通过dequeue
实现
add
public boolean add(E e)
return super.add(e);
public boolean add(E e)
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
public boolean offer(E e)
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try
if (count == items.length)
return false;
else
enqueue(e);
return true;
finally
lock.unlock();
可以明显的看到,当调用add时,最终是通过offer
实现,offer操作时会上锁,如果数组已经满了,则直接返回false。
put
public void put(E e) throws InterruptedException
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try
// 循环,当队列满了时,让出当前锁资源,当前线程等待
while (count == items.length)
notFull.await();
enqueue(e);
finally
lock.unlock();
take
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
// 当前队列空,让出资源等待
while (count == 0)
notEmpty.await();
return dequeue();
finally
lock.unlock();
poll
public E poll()
final ReentrantLock lock = this.lock;
lock.lock();
try
// 当前队列空,直接返回null
return (count == 0) ? null : dequeue();
finally
lock.unlock();
接下来看看入队和出队怎么实现的:
private void enqueue(E x)
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
private E dequeue()
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
这里来简单分析下入队、出队逻辑,ArrayBlockingQueue
中通过takeIndex
、putIndex
、count
、notFull
、notEmpty
,来标记当前数组的一些特性,数组中的元素是循环使用的:
count== 0 的时候,消费者通过otEmpty.await让当前线程等待,而生产者插入数据后则通过 notEmpty.signal唤醒消费者;如果插入的时候队列满了,通过notFull.await让生产者等待,当消费者消费获取数据之后,通过notEmpty.signal唤醒生产者线程
而消费和生产都是通过锁来消除并发带来的影响,生产者通过putIndex记录自己插入数据的位置,当putIndex == items.length时,putIndex=0,而消费者则通过takeIndex记录自己的消费位置,当takeIndex == items.length时,takeIndex=0
总结:ArrayBlockingQueue底层是基于数组的同步阻塞队列,数组的大小在运行期间不会发生改变,数组是会循环使用的,消费者和生产者各自维护一个putIndex 和takeIndex 数组指针来记录位置。通过一个lock和两个condition(notFull,notEmpty)来进行同步和提交控制。
以上是关于java同步阻塞队列之ArrayBlockingQueue实现原理的主要内容,如果未能解决你的问题,请参考以下文章
java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比
java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理
Java同步数据结构之ConcurrentLinkedQueue