并发编程之ArrayBlockingQueue

Posted microhex

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

最近在学习并发编程,在看队列,今天先上一篇 ArrayBlockingArray。

文章目录

队列的概念:

是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,在表的尾端(end)进行插入操作。
和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称之为队尾,进行删除操作的端成为队头。

插入元素称之为入队,删除元素成为出队.

在列队中只允许一端插入,在另一端删除,所有只有最早进入队列的元素才能最先从队列中删除,所以队列中又称之为 先进先出(FIFO → First In First Out)

阻塞队列:

a. 支持阻塞的插入方法:意思就是当队列满时,队列会阻塞插入元素的线程,直到队列不满;

b. 支持阻塞的移除方法:意思在队列为空时,获取元素的线程会等待队列变为非空。

ArrayBlockingQueue

数组实现的有界阻塞队列,遵循FIFO原则,对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞的队列先访问。非公平性是对先等待的线程是非公平性的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的队列后访问队列。

ArrayBlockingQueue 构造方法


存在三个构造方法:
需要一个初始化容量,还可以定制可重入锁是否公平,最后还有一种是直接初始化数组内容的构造方式

	// 默认需要一个容量
	public ArrayBlockingQueue(int capacity) 
	        this(capacity, false);
	
	
	// 默认需要一个容量 
	// fair 规定可重入锁是公平还是不公平
	public ArrayBlockingQueue(int capacity, boolean fair) 
	        if (capacity <= 0)
	            throw new IllegalArgumentException();
	        this.items = new Object[capacity];
	        lock = new ReentrantLock(fair);
	        notEmpty = lock.newCondition();
	        notFull =  lock.newCondition();
	
	
	// 默认初始化数组内容
	public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) 
	        this(capacity, fair);
	
	        final ReentrantLock lock = this.lock;
	        lock.lock(); // Lock only for visibility, not mutual exclusion
	        try 
	            int i = 0;
	            try 
	                for (E e : c) 
	                    checkNotNull(e);
	                    items[i++] = e;
	                
	             catch (ArrayIndexOutOfBoundsException ex) 
	                throw new IllegalArgumentException();
	            
	            count = i;
	            putIndex = (i == capacity) ? 0 : i;
	         finally 
	            lock.unlock();
	        
	    

当然,还有其它的一些操作变量:

   /** items index for next take, poll, peek or remove */
	// 取出元素的位置
    int takeIndex;

    /** items index for next put, offer, or add */
	// 放入元素的位置
    int putIndex;

    /** Number of elements in the queue */
    // 元素的个数
    int count;

    /** Main lock guarding all access */
    // 可重入锁对象
    final ReentrantLock lock;

    /** Condition for waiting takes */
    // 不为空的信号量
    private final Condition notEmpty;

    /** Condition for waiting puts */
    // 队列不满的信号量
    private final Condition notFull;

队列操作方法

1. add方法

public boolean add(E e) 
   return super.add(e);

直接看其父类:

public boolean add(E e) 
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    

offer方法是抽象的,可以直接查看 ArrayBlockingQueue 的实现:
可以看到, offer方法是不能插入空值的

public boolean offer(E e) 
   checkNotNull(e);
   final ReentrantLock lock = this.lock;
   lock.lock();
   try 
      if (count == items.length)
           return false;
      else 
            enqueue(e);
            return true;
       
       finally 
         lock.unlock();
    

其中 enqueue 方法为,直接往数组添加数据:

private void enqueue(E x) 
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length)
       putIndex = 0;
  count++;
  notEmpty.signal();

2. peek方法

使用了可重入锁,取出数组中 takeIndex处的元素:

public E peek() 
   final ReentrantLock lock = this.lock;
   lock.lock();
   try 
        return itemAt(takeIndex); // null when queue is empty
     finally 
        lock.unlock();
   

3. put / take 方法

是一对互斥操作,put是向队列中放入数据,take是向队列中取出数据, 但是和 add/peek 的实现方式不一样,其中 put方法为:

public void put(E e) throws InterruptedException 
       
    checkNotNull(e);

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
       while (count == items.length)
             notFull.await();
       enqueue(e);
     finally 
      lock.unlock();
    

然后 take方法为:

public E take() throws InterruptedException 

   final ReentrantLock lock = this.lock;
   
   lock.lockInterruptibly();
   try 
            while (count == 0)
                notEmpty.await();
            return dequeue();
         finally 
            lock.unlock();
   

可以看出,put/take 方法是阻塞方法,put()会等待队列不满仓,take()会等待队列不为空.

4. offer / poll 方法

它们之间也是一对互斥操作,offer()是放入元素,poll()是取出元素.
offer()上面已经贴图看过,现在主要来看一下poll() 方法:

public E poll() 
  final ReentrantLock lock = this.lock;
  lock.lock();
  
  try 
     return (count == 0) ? null : dequeue();
   finally 
     lock.unlock();
  

5. offer(E e, long timeout, TimeUnit unit) / poll (long timeout, TimeUnit unit)

offerpoll 的加强版. 在队列满时指定了重试的时间,如果超过指定的时间后还是无法添加或取出则返回false。

直接看 offer(E e, long timeout, TimeUnit unit)方法:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException 

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();

        try 
            while (count == items.length) 
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            
            enqueue(e);
            return true;
         finally 
            lock.unlock();
        
    

对于 poll (long timeout, TimeUnit unit)方法:
在规定时间内重试,超过固定时间则直接返回null.

public E poll(long timeout, TimeUnit unit) throws InterruptedException 
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            while (count == 0) 
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            
            return dequeue();
         finally 
            lock.unlock();
        
    

队列方法的比较

add系列方法比较:

peek系列方法比较:


其中,只有peek()方法只返回数组指定位置的元素,但是并没有移除该元素;
poll()take()poll() 如果在队列不为空的情况下,获取到元素时,同时也会从队列中移除该元素。

以上是关于并发编程之ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

并发编程之ArrayBlockingQueue

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解

并发队列之ArrayBlockingQueue

并发容器之ArrayBlockingQueue与LinkedBlockingQueue详解

java并发编程工具类JUC第二篇:ArrayBlockingQueue