双端阻塞队列LinkedBlockingDeque源码深入解析

Posted 蜀中孤鹰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了双端阻塞队列LinkedBlockingDeque源码深入解析相关的知识,希望对你有一定的参考价值。

 

1 前言

与普通队列相比,阻塞队列另外支持两个附加操作,这两个附加的操作支持阻塞的插入和移除方法。

  • ①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

  • ②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。

一般的阻塞队列相比,只能在“尾部入队、在头部出队”,而双端阻塞队列,在头部和尾部均能入队,在头部和尾部均能出队。

LinkedBlockingDeque继承于抽象类AbstractQueue,并实现了BlockingDeque接口(BlockingDeque扩展了BlockingQueue,它表示一个双向阻塞队列的接口)。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。LinkedBlockingDeque是一个有界的双向阻塞队列,如果构造方法没有指定容量,其默认容量是无限大。LinkedBlockingDeque是一个基于双向链表结构的双端队列,内部使用静态内部类Node表示一个链表节点(本文基于JDK1.8)

 

BlockingDeque接口方法说明

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    //BlockingDeque自身定义的方法
//在队列头部插入元素,若容量已满抛出异常IllegalStateException
    void addFirst(E e);
    //在队列尾部插入元素,若容量已满抛出异常IllegalStateException
    void addLast(E e);
    //在队列头部插入元素,若容量已满返回false
    boolean offerFirst(E e);
    //在队列尾部插入元素,若容量已满返回false
    boolean offerLast(E e);
    //在队列头部插入元素,若容量已满则阻塞等待
    void putFirst(E e) throws InterruptedException;
    //在队列尾部插入元素,若容量已满则阻塞等待
    void putLast(E e) throws InterruptedException;
    //在队列头部插入元素,若容量已满则超时等待
    boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException;
    //在队列尾部插入元素,若容量已满则超时等待
    boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException;
    //在队列头部删除并返回元素,若队列已空则阻塞等待
    E takeFirst() throws InterruptedException;
    //在队列尾部删除并返回元素,若队列已空则阻塞等待
    E takeLast() throws InterruptedException;
    //在队列头部删除并返回元素,若队列已空则超时等待,在超时之前无法出队就返回null
    E pollFirst(long timeout, TimeUnit unit) throws InterruptedException;
    //在队列尾部删除并返回元素,若队列已空则超时等待,在超时之前无法出队就返回null
    E pollLast(long timeout, TimeUnit unit) throws InterruptedException;
    //从队列头部开始算起,在指定元素第一次出现的位置将之删除
    boolean removeFirstOccurrence(Object o);
    //从队列尾部开始算起,在指定元素第一次出现的位置将之删除
    boolean removeLastOccurrence(Object o);
​
​
   //父接口BlockingQueue的方法
boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    E remove();
    E poll();
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    E element();
    E peek();
    boolean remove(Object o);
    public boolean contains(Object o);
    public int size();
    Iterator<E> iterator();
​
​
    //栈方法
//在队列的头部压栈
    void push(E e);
    //在队列的头部出栈
    E pop();
}
BlockingDeque

 

2 静态内部类Node与成员变量

Node类

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

一个Node对象代表一个链表节点,其属性item表示当前节点保存的元素(item为null时表示当前节点已经被删除),next属性表示当前节点的后继节点,prev属性表示当前节点的前驱节点。

 

成员变量

/**
 * Pointer to first node.
 * Invariant: (first == null && last == null) ||
 *            (first.prev == null && first.item != null)
 */
transient Node<E> first;
/**
 * Pointer to last node.
 * Invariant: (first == null && last == null) ||
 *            (last.next == null && last.item != null)
 */
transient Node<E> last;
/** Number of items in the deque */
private transient int count;
/** Maximum number of items in the deque */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();

first: 队列的头节点

last:队列的尾节点

count:队列中的元素个数

capacity:队列的指定容量

lock:可重入锁,保证所有数据访问的线程安全

notEmpty: 出队时的“非空”条件

notFull:入队时的“未满”条件

3 构造方法

LinkedBlockingDeque(int)是其主要的构造方法,构造方法主要涉及对队列容量的初始化。在使用无参构造方法时,阻塞队列的容量是Integer.MAX_VALUE,即无限大。

在初始化后,队列中不含任何元素时,头节点 、尾节点均是null。

public LinkedBlockingDeque() {
    this(Integer.MAX_VALUE);
}
//指定容量
public LinkedBlockingDeque(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
}
//将某集合元素放入阻塞队列中
public LinkedBlockingDeque(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock lock = this.lock;
    lock.lock(); // Never contended, but necessary for visibility
    try {
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (!linkLast(new Node<E>(e)))
                throw new IllegalStateException("Deque full");
        }
    } finally {
        lock.unlock();
    }
}

 

4 主要方法

1) 添加元素

(1) 在队尾添加元素

put offer add offer 等方法都是在队列的尾部添加元素。它们将核心实现委托给 putLastofferLast实现

public void put(E e) throws InterruptedException {
    putLast(e);
}
public boolean offer(E e) {
    return offerLast(e);
}
public boolean add(E e) {
    addLast(e);
    return true;
}
public boolean  offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    return offerLast(e, timeout, unit);
}

 

putLast

putLast 在队尾添加元素

putLast先获取lock锁(lock.lock()),然后尝试在队尾添加一个新节点(linkLast(node)),若链接新节点失败,则(notFull.await())休眠等待直到等到”未满“通知。

public void putLast(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkLast(node))
            notFull.await();//休眠等待
    } finally {
        lock.unlock();
    }
}

 

linkLast尝试在队列的尾部添加一个新节点。主要逻辑:

①若队列已满,直接返回false(不能链接新节点) ; ②将待入队节点node作为新的尾节点添加在队尾(last = node)并更新相关链接关系;③元素计数加1(++count) ;④唤醒一个等待"非空"条件的线程(notEmpty.signal()),最后返回true.

private boolean linkLast(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    if (count >= capacity)//队列已满,不能链接新节点
        return false;
    Node<E> l = last;
    node.prev = l;//设置node的前驱节点,它的前驱为原尾节点
    last = node;//新尾节点是刚添加的节点node
    //更新原尾节点l的后继节点
    if (first == null) //队列中没有任何节点,last first均未初始化,
       //队列中只有一个节点(元素)时,头节点first 和尾节点last指定同一节点node 
        first = node;
    else
        l.next = node;//原尾节点l的后继节点是新尾节点(刚添加的节点node)
    ++count;//元素个数加1
    notEmpty.signal();//通知一个等待"非空"条件的线程
    return true;
}

 

offerLast

offerLast方法与putLast类似,offerLast在检测到队列已满时会直接返回false,不会阻塞等待。

public boolean offerLast(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkLast(node);
    } finally {
        lock.unlock();
    }
}

 

多参数的offerLast方法,它可以看作putLast的超时版本。

public boolean offerLast(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkLast(node)) {
            if (nanos <= 0)//在超时之后没能入队,返回false
                return false;
            nanos = notFull.awaitNanos(nanos);//超时等待
        }
        return true;
    } finally {
        lock.unlock();
    }
}

 

addLast

addLast 在队尾添加元素,若队列已满抛出异常。

public void addLast(E e) {
    if (!offerLast(e))
        throw new IllegalStateException("Deque full");
}

(2) 在队首添加元素

addFirst

push 在 在队列的头部插入元素,若队列已经满抛出IllegalStateException异常

public void push(E e) {
    addFirst(e);
}
    public void addFirst(E e) {
        if (!offerFirst(e))
            throw new IllegalStateException("Deque full");
    }

putFirst

putLast在队列头部插入元素,若容量已满则阻塞等待

先获取lock锁(lock.lock()),然后尝试在队首添加一个新节点(linkFirst(node)),若链接新节点失败,则(notFull.await())休眠等待直到等到”未满“通知。

public void putFirst(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        while (!linkFirst(node))
            notFull.await();
    } finally {
        lock.unlock();
    }
}

 

linkLast尝试在队列的头部链接一个新节点。主要逻辑:

①若队列已满,直接返回false(不能链接新节点) ; ②将待入队节点node作为新的头节点添加在队首(first = node)并更新相关链接关系;③元素计数加1(++count) ;④通知一个等待"非空"条件的线程(notEmpty.signal()),最后返回true.

private boolean linkFirst(Node<E> node) {
    // assert lock.isHeldByCurrentThread();
    if (count >= capacity)
        //队列已满,不能链接新节点
        return false;
    Node<E> f = first;
    node.next = f;//设置node的后继节点,它的后继为原头节点
    first = node;////新的尾节点是刚添加的节点node
    //更新原头节点f的前驱节点
    if (last == null)//队列中没有任何节点,last first均未初始化
         //队列中只有一个节点(元素)时,头节点first 和尾节点last指定同一节点node
        last = node;
    else
        f.prev = node;//原头节点f的前驱节点是新头节点(刚添加的节点node)
    ++count;//元素个数加1
    notEmpty.signal();//通知一个等待"非空"条件的线程
    return true;
}

offerFirst

offerFirst方法与putFirst类似,但offerFirst在检测到队列已满时会直接返回false,不会阻塞等待。

public boolean offerFirst(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return linkFirst(node);
    } finally {
        lock.unlock();
    }
}

多参数的offerFirst方法,它可以看作putFirst的超时版本.

public boolean offerFirst(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (!linkFirst(node)) {
            if (nanos <= 0)  //在超时后,表明入队失败
                return false;
            nanos = notFull.awaitNanos(nanos);//超时等待
        }
        return true;
    } finally {
        lock.unlock();
    }
}

 

2) 删除元素

(1) 在队首删除元素

remove pop poll take 等方法都是在队列的尾部添加元素。它们将核心实现委托给 removeFirst 、pollFirst 、takeFirst等实现

public E take() throws InterruptedException {
    return takeFirst();
}
public E poll() {
    return pollFirst();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  return pollFirst(timeout, unit);
}
public E remove() {
    return removeFirst();
}
public E pop() {
    return removeFirst();
}

 

takeFirst

takeFirst 先获取lock锁(lock.lock()),然后尝试在队首取消一个节点的链接(unlinkFirst()),若取消链接失败(队列已空),则(notEmpty.await())休眠等待直到等到”非空“通知。

public E takeFirst() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkFirst()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

 

unlinkFirst尝试取消头节点在链表中的链接关系

①若队列为空,直接返回null(不能取消链接) ; ②将原头节点的后继节点(Node<E> n = f.next)作为新头节点(first = n)并更新相关链接关系;③元素计数自减1(--count) ;④唤醒一个等待"未满"条件的线程(notFull.signal()),最后返回原头节点中的元素(E item = f.item).

private E unlinkFirst() {
    // assert lock.isHeldByCurrentThread();
    Node<E> f = first;
    if (f == null)
        //链表未初始化,队列中没有任何元素,返回null
        return null;
    Node<E> n = f.next;
    E item = f.item;//保存原头节点的item,最终需要返回头节点的item
    f.item = null;//然后原头节点的item属性赋空
    f.next = f; // help GC
    first = n;//新头节点是原头节点的后继节点
    //设置新头节点的前驱节点
    if (n == null) //删除头节点前链表中只有一个节点
        //将头节点first、尾节点tail都设为null,链表中没有任何节点了。
        last = null;
    else//删除头节点前链表中至少有两个节点(元素)
        n.prev = null;//将新头节点的prev设为null(头节点没有前驱节点)
    --count;//元素个数减1
    notFull.signal();//唤醒一个等待”未满“条件的线程
    return item;
}

 

pollFirst

pollFirst方法与takeFirst类似,只不过pollFirst在检测到队列为空时会直接返回null,不会阻塞等待。

public E pollFirst() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkFirst();
    } finally {
        lock.unlock();
    }
}

pollFirst(long,TimeUnit)可以看作是超时版本的takeFirst .

public E pollFirst(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkFirst()) == null) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

 

removeFirst

removeFirst直接委托pollFirst实现,若队列为空,则抛出异常NoSuchElementException。

public E removeFirst() {
    E x = pollFirst();
    if (x == null) throw new NoSuchElementException();
    return x;
}

 

(2) 在队尾删除元素

takeLast

takeLast先获取lock锁(lock.lock()),然后尝试在队尾取消一个节点的链接(unlinkLast()),若取消链接失败(队列已空),则(notEmpty.await())休眠等待直到等到”非空“通知。

public E takeLast() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E x;
        while ( (x = unlinkLast()) == null)
            notEmpty.await();
        return x;
    } finally {
        lock.unlock();
    }
}

 

unlinkLast尝试取消尾节点在链表中的链接

①若队列为空,直接返回null(不能取消链接) ; ②将原尾节点的前驱节点(Node<E> p = l.prev)作为新尾节点(last = p)并更新相关链接关系;③元素计数自减1(--count) ;④唤醒一个等待"未满"条件的线程(notFull.signal()),最后返回原尾节点中的元素(E item = l.item).

private E unlinkLast() {
    // assert lock.isHeldByCurrentThread();
    Node<E> l = last;
    if (l == null)
         //链表未初始化,队列中没有任何元素,返回null
        return null;
    Node<E> p = l.prev;
    E item = l.item;//保存尾节点的item,最终需要返回尾节点的item
    l.item = null;//然后将原尾节点的item属性清空
    //prevn属性自指,在使用迭代器时能标识此节点已被删除
    l.prev = l; // help GC 
    last = p;//新尾节点是原尾节点的前驱继节点
     //设置新尾节点的后继节点
    if (p == null)//删除尾节点前 链表中只有一个节点l
         //将头节点first、尾节点tail都设为null,链表中没有任何节点了
        first = null;
    else//删除尾节点前链表中至少有两个节点(元素)
        p.next = null;//将新尾节点的next设为null(尾节点没有后继节点)
    --count;//元素个数减1
    notFull.signal();//唤醒一个等待”未满“条件的线程
    return item;
}

 

pollLast

pollLast方法与takeLast类似,但pollLast在检测到队列为空时会直接返回null,不会阻塞等待。

public E pollLast() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return unlinkLast();
    } finally {
        lock.unlock();
    }
}

pollLast(long,TimeUnit)可以看作是超时版本的takeLast,在超时之前无法出队就返回null.

public E pollLast(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        E x;
        while ( (x = unlinkLast()) == null) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return x;
    } finally {
        lock.unlock();
    }
}

removeLast

removeLast`直接委托pollLast实现,若队列为空,则抛出异常NoSuchElementException。

public E removeLast() {
    E x = pollLast();
    if (x == null) throw new NoSuchElementException();
    return x;
}

 

(3) 删除指定的元素

remove、 removeFirstOccurrence方法均是从队列头部开始向后查找,在给定元素第一次出现的位置上将之删除。

removeLastOccurrence是从队列尾部开始向前查找,在给定元素第一次出现的位置上将之删除。

public boolean remove(Object o) {
    return removeFirstOccurrence(o);
}
    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {//从队列头部开始向后查找
                if (o.equals(p.item)) {
                    unlink(p);//取消此节点在链表中的链接关系
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
​
    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock

以上是关于双端阻塞队列LinkedBlockingDeque源码深入解析的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列并发容器之阻塞队列Overview

并发容器2

Java并发包--LinkedBlockingDeque

Deque 双端队列

Java双端队列Deque使用详解

Java阻塞队列BlockingQueue(生产者消费者模型)