并发阻塞队列BlockingQueue解读

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发阻塞队列BlockingQueue解读相关的知识,希望对你有一定的参考价值。

并发阻塞队列BlockingQueue解读


引言

首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。总结如下:

Throws exceptionSpecial valueBlocksTimes out
Insertadd(e)offer(e)put(e)offer(e, time, unit)
Removeremove()poll()take()poll(time, unit)
Examineelement()peek()not applicablenot applicable

BlockingQueue 的各个实现都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可。

对于 BlockingQueue,我们的关注点应该在 put(e) 和 take() 这两个方法,因为这两个方法是带阻塞的。

BlockingQueue 不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回(表格中的第三列),代表 poll 失败。所以,如果允许插入 null 值的话,那获取的时候,就不能很好地用 null 来判断到底是代表失败,还是获取的值就是 null 值。

一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)。

BlockingQueue 是设计用来实现生产者-消费者队列的,当然,你也可以将它当做普通的 Collection 来用,前面说了,它实现了 java.util.Collection 接口。例如,我们可以用 remove(x) 来删除任意一个元素,但是,这类操作通常并不高效,所以尽量只在少数的场合使用,比如一条消息已经入队,但是需要做取消操作的时候。

BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll© 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现。

BlockingQueue 不支持 close 或 shutdown 等关闭操作,因为开发者可能希望不会有新的元素添加进去,此特性取决于具体的实现,不做强制约束。

最后,BlockingQueue 在生产者-消费者的场景中,是支持多消费者和多生产者的,说的其实就是线程安全问题。

相信上面说的每一句都很清楚了,BlockingQueue 是一个比较简单的线程安全容器,下面我会分析其具体的在 JDK 中的实现,这里又到了 Doug Lea 表演时间了。


BlockingQueue 实现之 ArrayBlockingQueue

ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现类,底层采用数组来实现。

其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。

ArrayBlockingQueue 共有以下几个属性:

// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;

// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

我们用个示意图来描述其同步机制:

ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。

所谓读写队列,也就对应着消费者和生产者队列

对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:

  • 队列容量,其限制了队列中最多允许的元素个数;
  • 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
  • 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。

源码解读

构造函数

    public ArrayBlockingQueue(int capacity) 
        this(capacity, false);
    

    public ArrayBlockingQueue(int capacity, boolean fair) 
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    

可以看到items数组分配的大小,就是由我们构造函数传入的长度决定的,并且默认是构造出来的ReentrantLock是非公平实现。


offer

  • offer方法: 尝试插入队列,如果成功,返回true,如果队列满了,插入失败,那么直接返回false
public class Main 
    @SneakyThrows
    public static void main(String[] args) 
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(2);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.execute(()->
            try 
                PauseUtil.pauseRandomTime();
                arrayBlockingQueue.offer("123");
                arrayBlockingQueue.offer("123");
             catch (InterruptedException e) 
                e.printStackTrace();
            
        );

        executorService.execute(()->
            try 
                PauseUtil.pauseRandomTime();
                String take = arrayBlockingQueue.take();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        );
        System.in.read();
    

源码:

    public boolean offer(E e) 
        Objects.requireNonNull(e);
        //加锁--防止并发调用enqueue方法--该方法并不是线程安全的,因此需要加锁保护
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            //如果队列满了--插入失败,返回false
            if (count == items.length)
                return false;
            else 
               //将元素入队---然后返回true
                enqueue(e);
                return true;
            
         finally 
        //解锁--唤醒阻塞在当前锁上的其他线程
            lock.unlock();
        
    

通用入队的enqueue源码如下:

    private void enqueue(E e) 
        final Object[] items = this.items;
        //当前元素插入到数组第几个位置,是由putIndex记录的
        items[putIndex] = e;
        //这里数组是作为循环队列使用的--后面会详细说明--这里先不管
        if (++putIndex == items.length) putIndex = 0;
        //计数加一
        count++;
        //队列此时不为空了,需要唤醒阻塞在notEmpty条件队列上的线程
        notEmpty.signal();
    

poll

  • poll方法: 尝试从阻塞队列头部获取一个元素,没有元素就返回null,否则返回对应的元素
    public E poll() 
        //锁住资源,因此enqueue方法并不是线程安全的
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
           //count==0说明,队列没有元素
            return (count == 0) ? null : dequeue();
         finally 
            lock.unlock();
        
    

dequeue是通用的出队代码:

    private E dequeue() 
        final Object[] items = this.items;
        //takeIndex是表示当前队列头部处于哪个位置(循环队列)---下面讲
        E e = (E) items[takeIndex];
        //对应位置的元素值置空
        items[takeIndex] = null;
        //循环队列--下面讲
        if (++takeIndex == items.length) takeIndex = 0;
        //计数减一
        count--;
        //可忽略
        if (itrs != null)
            itrs.elementDequeued();
        //此时队列一定不是满的,可以唤醒阻塞在notFull条件上的生产者线程
        notFull.signal();
        return e;
    

循环队列

BlockingQueue底层用数组模拟的队列实现,可以参考下图所示:


add

通过调用offer判断插入结果,如果插入失败,那么抛出异常即可。

   public boolean add(E e) 
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    

remove

通过调用poll判断删除结果,如果返回null,表示阻塞队列为空,删除失败,那么抛出异常

    public E remove() 
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    

put

put是阻塞添加,如果此时队列满了,就需要阻塞等到队列不满时被唤醒,然后继续尝试添加:

    public void put(E e) throws InterruptedException 
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;
        //同样是为了保护非线程安全的enqueue方法
        //如果阻塞等待获取锁期间,被打断了,那么会抛出异常
        lock.lockInterruptibly();
        try 
           //如果此时阻塞队列已经满了,那么就将当前线程阻塞在notFull条件对象对应的阻塞队列上--释放当前线程占有的lock锁
           //被唤醒后,将当前线程转移到lock锁关联的阻塞队列上,等待获取到锁后继续执行
            while (count == items.length)
                notFull.await();
            //条件满足,入队    
            enqueue(e);
         finally 
            lock.unlock();
        
    

有关AQS,ReentrantLock和Condition的知识,不清楚的,去看我之前分享的文章。


take

take是阻塞获取,如果此时阻塞队列为空,当前线程需要阻塞在notEmpty条件关联的阻塞队列上:

    public E take() throws InterruptedException 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try 
            //如果阻塞队列为空
            while (count == 0)
           //如果此时阻塞队列为空,那么就将当前线程阻塞在notEmpty条件对象对应的阻塞队列上--释放当前线程占有的lock锁
           //被唤醒后,将当前线程转移到lock锁关联的阻塞队列上,等待获取到锁后继续执行
                notEmpty.await();
            return dequeue();
         finally 
            lock.unlock();
        
    

peek

peek用来获取队头元素,如果队列为空,返回null :

    public E peek() 
        final ReentrantLock lock = this.lock;
        //这里要加锁保护是因为如果从items数组获取元素时不加锁,此时其他线程从items中添加元素或者删除元素,会导致并发问题产生
        lock.lock();
        try 
           //takeIndex是队头索引
            return itemAt(takeIndex); // null when queue is empty
         finally 
            lock.unlock();
        
    
    final E itemAt(int i) 
        //如果队头元素都为空,说明队列为空
        return (E) items[i];
    

element

如果队列为空抛出异常,否则返回队头元素:

    public E element() 
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    

ArrayBlockingQueue小结

BlockingQueue其中一个实现类ArrayBlockingQueue源码到此就基本分析结束了,可以看出ArrayBlockingQueue本身实现非常简单,主要是借助了ReentrantLock来保证线程安全;Condition条件队列来让线程在不满足条件时阻塞,满足条件时被唤醒。

当然,ReentrantLock和Condition最终都是借助于JUC框架底层基石AQS完成的,所以: 基础不牢,地动山摇。


BlockingQueue 实现之 LinkedBlockingQueue

LinkedBlockingQueue底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。

源码解读

构造函数

// 传说中的无界队列
public LinkedBlockingQueue() 
    this(Integer.MAX_VALUE);


// 传说中的有界队列
public LinkedBlockingQueue(int capacity) 
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);

我们看看这个类有哪些属性:

// 队列容量
private final int capacity;

// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);

// 队头
private transient Node<E> head;

// 队尾
private transient Node<E> last;

// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();

// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();

// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();

// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();

这里用了两个锁,两个 Condition,简单介绍如下:

takeLock 和 notEmpty 怎么搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。

putLock 需要和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。

首先,这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再开始分析源码:

这里的链表就是最简单的链表结构:

    static class Node<E> 
        E item;
        Node<E> next;
        Node(E x)  item = x; 
    

offer

    public boolean offer(E e) 
        if (e == null) throw new NullPointerException();
        //没加锁上来直接进行一波阻塞队列是否已满的判断
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        final int c;
        final Node<E> node = new Node<E>(e);
        //生产者使用的putLock
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try 
             //上锁后,还需要进行一次判断,因为鬼知道是直接就获取到了锁,还是阻塞了一会才拿到,那么这段阻塞时间是否存在其他线程往阻塞队列添加元素呢? 
            if (count.get() == capacity)
                return false;
            //加入阻塞队列    
            enqueue(node);
            //计数加一
            c = count.getAndIncrement();
            //1:
            if (c + 1 < capacity)
                notFull.signal();
         finally 
            putLock.unlock();
        
        //2:
        if (c == 0)
            signalNotEmpty();
        return true;
    

通用的入队代码很简单,就是插入到链表尾部:

    private void enqueue(Node<E> node) 
        last = last.next = node;
    

signalNotEmpty负责唤醒在notEmpty条件队列上阻塞着的生产者们:

    private void signalNotEmpty() 
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try 
            //唤醒阻塞在notEmpty条件队列上的消费者们
            notEmpty.signal();
         finally 
            takeLock.unlock();
        
    

不知道大家有无注意到标号的1和2处,这一点是和ArrayBlockingQueue不同的,这两处代码的作用很好理解,但是令人费解的是,明明offer是非阻塞调用,那么阻塞队列按理是不会被使用到的,除非是混合调用阻塞和非阻塞的方法,如果有小伙伴清楚原因的,可以在评论区留言。


poll

    public E poll() 
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        final E x;
        final int c;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try 
            if (count.get() == 0)
                return null;
             //简单的从链表移除节点的操作   
            x = dequeue();
            //计数减一
            c = count.getAndDecrement();
            //1:
            if (c > 1)
                notEmpty.signal();
         finally 
            takeLock.unlock();
        
        //2:
        if (c == capacity)
            signalNotFull();
        return x;
    

除了两处标号地方,其他操作和ArrayBlockingQueue一致。


LinkedBlockingQueue小结

LinkedBlockingQueue总体实现思路和ArrayBlockingQueue一致,但是LinkedBlockingQueue还是有一些额外的小动作比较令人费解。


BlockingQueue 实现之 SynchronousQueue

它是一个特殊的队列,它的名字其实就蕴含了它的特征 - - 同步的队列。为什么说是同步的呢?这里说的并不是多线程的并发问题,而是因为当一个线程往队列中写入一个元素时,写入操作不会立即返回,需要等待另一个线程来将这个元素拿走;同理,当一个读线程做读操作的时候,同样需要一个相匹配的写线程的写操作。这里的 Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。

我们比较少使用到 SynchronousQueue 这个类,不过它在线程池的

以上是关于并发阻塞队列BlockingQueue解读的主要内容,如果未能解决你的问题,请参考以下文章

Java并发多线程编程——阻塞队列(BlockingQueue)

并发编程-J.U.C组件拓展之阻塞队列BlockingQueue

解读 Java 并发队列 BlockingQueue

Java并发之BlockingQueue

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

Java BlockingQueue阻塞队列