通俗易懂的JUC源码剖析-LinkedBlockingQueue

Posted 小强大人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通俗易懂的JUC源码剖析-LinkedBlockingQueue相关的知识,希望对你有一定的参考价值。

前言

LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。

实现原理

先来看关键属性:

// 队列容量,最大为Integer.MAX_VALUE
private final int capacity;
// 队列长度
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
// 移除操作的锁,take/poll方法用到
private final ReentrantLock takeLock = new ReentrantLock();
// 移除操作需要等待的条件notEmpty,与takeLock绑定
private final Condition notEmpty = takeLock.newCondition();
// 入队操作的锁,put/offer方法用到
private final ReentrantLock putLock = new ReentrantLock();
// 入队操作需要等待的条件notFull,与putLock绑定
private final Condition notFull = putLock.newCondition();

可以看到,LinkedBlockingQueue内部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个等待条件:notEmpty和notFull。takeLock控制同一时刻只有一个线程从队列头部获取/移除元素,putLock控制同一时刻只有一个线程在队列尾部添加元素。

再来看关键方法:

1.无参构造函数

public LinkedBlockingQueue() {
    // 调用有参构造函数,初始化容量capacity为int最大值
    this(Integer.MAX_VALUE);
}

2.有参构造函数

public LinkedBlockingQueue(int capacity) {
    // 容量不能小于0,注意也不能等于0,这点与常规的集合不同
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化头结点和尾结点为哑节点
    last = head = new Node<E>(null);
}

3.put()操作

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException(); 
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 队列满了则阻塞等待,用while而不是if是为了避免虚假唤醒
        while (count.get() == capacity) {
            notFull.await();
        }
        // 元素加入队尾
        enqueue(node);
        c = count.getAndIncrement();
        // 如果队列没满,则唤醒某个等待在notFull条件上的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // 若put操作之前队列长度为0,则put后不为空了,则唤醒某个等待在notEmpty条件上的线程
    if (c == 0)
        signalNotEmpty();
}

4.put()操作

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
           return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

offer()方法结构与put()类似,区别在于如果队列满了,不会一直阻塞等待,而是直接返回false。
5.take()操作

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
           // 队列为空,则阻塞等待
           while (count.get() == 0) {
               notEmpty.await();
           }
           // 移除队列头部元素
           x = dequeue();
           c = count.getAndDecrement();
           // 若移除操作前队列长度>=2,则唤醒某个等待在notEmpty条件的线程
           if (c > 1)
               notEmpty.signal();
     } finally {
          takeLock.unlock();
     }
     // 若移除操作前队列是满的,则唤醒某个等待在notFull条件的线程
     if (c == capacity)
         signalNotFull();
     return x;
}

6.poll()操作

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
         if (count.get() > 0) {
             x = dequeue();
             c = count.getAndDecrement();
             if (c > 1)
                notEmpty.signal();
         }
    } finally {
         takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

poll()方法与take()结构类似,区别在于若队列是空的,不会一直阻塞,而是返回false。

总结下,take()、put()方法是阻塞的,poll()、offer()是非阻塞的。

顺带看下enqueue()和dequeue()方法源码:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null; 
    last = last.next = node;
}
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null; 
    Node<E> h = head;
    // 获取第一个元素结点first
    Node<E> first = h.next;
    // 将头结点自引用,并被垃圾回收掉
    h.next = h; // help GC
    // 将头结点指向第一个元素结点first
    head = first;
    // 获取第一个元素结点的值
    E x = first.item;
    // 将第一个元素结点的值置为null,成为新的哑节点
    first.item = null;
    // 返回被移除的节点元素值
    return x;
}

再来看看remove(Object o)方法:

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

其中fullyLock()、unlink(p, trail)代码如下:

// 通俗获取put和take操作锁,防止移除目标元素时有其他线程在修改队列。
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}
// 移除目标结点p,并将它的前继结点trail与它的后继结点p.next连接起来。
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are  
    // traversing p to maintain their weak-consistency   guarantee. 
    p.item = null;
    trail.next = p.next;
    // 如果移除的是尾结点,则重置尾结点为它的前继结点
    if (last == p)
        last = trail;
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

参考资料:
《Java并发编程之美》

以上是关于通俗易懂的JUC源码剖析-LinkedBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

通俗易懂的JUC源码剖析-LinkedBlockingQueue

通俗易懂的JUC源码剖析-StampedLock

通俗易懂的JUC源码剖析-FutureTask

通俗易懂的JUC源码剖析-ArrayBlockingQueue

通俗易懂的JUC源码剖析-ReentrantLock&AQS

通俗易懂的JUC源码剖析-CompletionService