java同步阻塞队列之ArrayBlockingQueue实现原理

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之ArrayBlockingQueue实现原理相关的知识,希望对你有一定的参考价值。

在java中我们可以用同步阻塞队列实现生产者-消费者模型。
ArrayBlockingQueue提供了阻塞队列功能,底层数据结构是基于数组,提供如下几个关键方法:

  • public boolean add(E e) 向队列中添加元素底层调用的是offer方法,如果添加成功返回true,否则抛出IllegalStateException异常
  • public boolean offer(E e),向队列中添加元素,如果队列 已经满了,返回false,否则添加元素
  • public void put(E e) throws InterruptedException 向队列中添加元素,如果队列满了,会阻塞等待
  • public E poll() 从队列头取出元素,如果队列空,不会等待,返回null
  • public E take() throws InterruptedException 从队列头取出元素,如果队列为空,会阻塞等待

接下来我们看看ArrayBlockingQueue中这几个方法的实现逻辑,再说实现逻辑之前,先看下其几个属性,

 	 // 底层结构数组,存储数据
    final Object[] items;
    // 读取数据数组下标
    int takeIndex;
    // 插入数据数组下标
    int putIndex;
    // 当前队列中数组元素个数
    int count;
    // 锁
    final ReentrantLock lock;
    // 条件队列
    private final Condition notEmpty;
    private final Condition notFull;
    
// 初始的时候传入容量,也可以指定锁是公平锁还是非公平锁,默认是非公平锁
public ArrayBlockingQueue(int capacity) 
        this(capacity, false);
    
    public ArrayBlockingQueue(int capacity, boolean fair) 
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    

另外,往队列的数组里面添加元素通过enqueue实现,获取元素通过dequeue实现

add

public boolean add(E e) 
        return super.add(e);
    
public boolean add(E e) 
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    
public boolean offer(E e) 
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            if (count == items.length)
                return false;
            else 
                enqueue(e);
                return true;
            
         finally 
            lock.unlock();
        
    

可以明显的看到,当调用add时,最终是通过offer实现,offer操作时会上锁,如果数组已经满了,则直接返回false。

put

 public void put(E e) throws InterruptedException 
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try 
        	// 循环,当队列满了时,让出当前锁资源,当前线程等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
         finally 
            lock.unlock();
        
    

take

public E take() throws InterruptedException 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
        	// 当前队列空,让出资源等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
         finally 
            lock.unlock();
        
    

poll

  public E poll() 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
        	// 当前队列空,直接返回null
            return (count == 0) ? null : dequeue();
         finally 
            lock.unlock();
        
    

接下来看看入队和出队怎么实现的:

private void enqueue(E x) 
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    
private E dequeue() 
        final Object[] items = this.items;
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    

这里来简单分析下入队、出队逻辑,ArrayBlockingQueue中通过takeIndexputIndexcountnotFullnotEmpty,来标记当前数组的一些特性,数组中的元素是循环使用的:
count== 0 的时候,消费者通过otEmpty.await让当前线程等待,而生产者插入数据后则通过 notEmpty.signal唤醒消费者;如果插入的时候队列满了,通过notFull.await让生产者等待,当消费者消费获取数据之后,通过notEmpty.signal唤醒生产者线程
而消费和生产都是通过锁来消除并发带来的影响,生产者通过putIndex记录自己插入数据的位置,当putIndex == items.length时,putIndex=0,而消费者则通过takeIndex记录自己的消费位置,当takeIndex == items.length时,takeIndex=0

总结:ArrayBlockingQueue底层是基于数组的同步阻塞队列,数组的大小在运行期间不会发生改变,数组是会循环使用的,消费者和生产者各自维护一个putIndex 和takeIndex 数组指针来记录位置。通过一个lock和两个condition(notFull,notEmpty)来进行同步和提交控制。

以上是关于java同步阻塞队列之ArrayBlockingQueue实现原理的主要内容,如果未能解决你的问题,请参考以下文章

java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比

java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理

Java同步数据结构之ConcurrentLinkedQueue

Java同步数据结构之LinkedBlockingQueue

并发编程实践之公平有界阻塞队列实现

java阻塞队列 线程同步合作