java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比相关的知识,希望对你有一定的参考价值。

上一篇我们说到ArrayBlockingQueue,底层是数组加锁机制实现同步阻塞队列,这里我们说下另外一个同步阻塞队列LinkedBlockingQueue.
从名字上就可以看出LinkedBlockingQueue底层数据结构是基于链表结构的。我们看下其几个关键属性:

  private final int capacity;
  private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
    private transient Node<E> last;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    
    private final ReentrantLock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();

与ArrayBlockingQueue不同,LinkedBlockingQueue维护了两把锁,一个takeLock 和一个putLock 分别控制并发读和并发写。而对于读取和写入来说,操作的分别是链表的头部和尾部,不存在竞争关系,理论上

public LinkedBlockingQueue() 
        this(Integer.MAX_VALUE);
    
    public LinkedBlockingQueue(int capacity) 
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    

可以看到,默认构造条件下,容量大小是Integer.MAX_VALUE,我们接下来看看其takeput操作:

public void put(E e) throws InterruptedException 
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try 
        	//队列满的时候,阻塞,等待唤醒
            while (count.get() == capacity) 
                notFull.await();
            
            // 被其他线程唤醒,这时候获取到putLock,且队列没有满,插入元素
            enqueue(node);
            // 需要注意的是,c 为实际大小 -1 
            c = count.getAndIncrement();
            // 如果队列未满,唤醒其他生产线程
            if (c + 1 < capacity)
                notFull.signal();
         finally 
            putLock.unlock();
        
        // c == 0 的时候,表名链表中元素为c+1个,这时候尝试唤醒消费者
        // 这里的逻辑可能会有点绕,并不是判断只要 c > 0 就去唤醒,
        // 因为消费线程阻塞的条件是链表元素个数为0,当count==0的时候,即链表元素个数为0的时候链表空了,这时候消费者会被阻塞
        // 而这里生产者通过锁机制,只能一次插入一个,这时候链表是空的,插入后,链表容量为1,这时候就可以唤醒消费者了
        if (c == 0)
            signalNotEmpty();
    

public E take() throws InterruptedException 
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try 
            while (count.get() == 0) 
                notEmpty.await();
            
            x = dequeue();
            c = count.getAndDecrement();
            // 如果队列不为空,唤醒其他消费等待线程
            if (c > 1)
                notEmpty.signal();
         finally 
            takeLock.unlock();
        
        if (c == capacity)
            signalNotFull();
        return x;
    

我们看下,LinkedBlockingQueue中的入队和出队是如何操作的:

private void enqueue(Node<E> node) 
        last = last.next = node;
    
    private E dequeue() 
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; 
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    

LinkedBlockingQueue在构造初始化的时候,就会初始化一个空的节点:

 last = head = new Node<E>(null);

然后在每次出队的时候,他的处理很微妙,并不是我们理解的将队列头元素弹出,LinkedBlockingQueue的队列的头结点永远是个空节点,如果需要出队(由于有计数器的控制),队列肯定不为空,表名head头结点肯定有下个节点,这时候并不是将头结点出队返回,而是将头结点的下一个节点设置为头结点,并且将这个节点的值取出,然后设置该节点的值为空,这样就有了一个新的空节点,出队操作的是head节点,入队操作则是last节点,这样保证了并发入队和出队的时候,二者操作的不会是同一个节点。

迭代

LinkedBlockingQueue是支持通过迭代器进行迭代处理,在迭代的时候,会同时对takeLock和putLock上锁,这时候既不能入队也不能出队。

public Iterator<E> iterator() 
        return new Itr();
    
Itr() 
            fullyLock();
            try 
                current = head.next;
                if (current != null)
                    currentElement = current.item;
             finally 
                fullyUnlock();
            
        

public E next() 
            fullyLock();
            try 
                if (current == null)
                    throw new NoSuchElementException();
                E x = currentElement;
                lastRet = current;
                current = nextNode(current);
                currentElement = (current == null) ? null : current.item;
                return x;
             finally 
                fullyUnlock();
            
        
void fullyLock() 
        putLock.lock();
        takeLock.lock();
    


    void fullyUnlock() 
        takeLock.unlock();
        putLock.unlock();
    

可以看到,在使用迭代器进行迭代的时候,使用fullyLock进行putLocktakeLock的上锁。

对比ArrayBlockingQueue

和ArrayBlockingQueue对比,二者提供的功能基本一致,只不过二者在底层数据结构上不一样

  • ArrayBlockingQueue底层采用的是有界数组,数组大小一旦确定后后期不能在调整,且在初始化之后就分配了固定大小的数组容量空间
  • LinkedBlockingQueue底层基于链表,其大小则是通过AtomicInteger来判定,但是每次入队都需要新建一个Node对象。

以上是关于java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比的主要内容,如果未能解决你的问题,请参考以下文章

java同步阻塞队列之ArrayBlockingQueue实现原理

java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比

java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理

Java同步数据结构之ConcurrentLinkedQueue

Java同步数据结构之LinkedBlockingQueue

并发编程实践之公平有界阻塞队列实现