ConcurrentLinkedQueue

Posted lenmom

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentLinkedQueue相关的知识,希望对你有一定的参考价值。

ConcurrentLinkedQueue是非阻塞无界的,基于FIFO原则,线程安全的队列,新节点的插入都是在队列的尾部插入的(tail节点),该队列适合于多个线程共享同一个集合时使用。

结构:

技术图片

构造函数

/**无参数构造函数,默认创建一个节点为nul的队列,head、tail节点同时指向null节点**/
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
/**有参数构造函数,传递一个集合,通过遍历集合将集合中的数据保存在队列中(从尾节点插入)**/
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        /**检查是否为空**/
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        /**初始队列为空,直接将head、tail同时指向该节点**/
        if (h == null)
            h = t = newNode;
        else {
            /**添加到尾节点**/
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    /**在参数为null或者size为0执行**/
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

offer插入

/**插入操作执行在tail节点,队列是无界的,不会返回false值**/
public boolean offer(E e) {
    /**检查是否为空**/
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // q为空表明,tail的next节点为null(即tail节点就是尾节点)
            if (p.casNext(null, newNode)) {
                /**执行cas的交换操作,将p(tail)节点的next节点设置成newNode节点,此处是CAS操作,
                  *即使存在多线程操作一次只会有一个几点插入到队列尾部
                 **/
                if (p != t) // 一次跳跃两个节点设置tail节点
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            /**当多个线程同时执行offer操作时,cas失败的线程会执行该操作,设置q节点为tail节点,执行下一次循环**/
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

add插入

add插入操作实际上调用的就是offer操作
public boolean add(E e) {
    return offer(e);
}

poll出队列

poll出队列:
public E poll() {
    restartFromHead:
    for (;;) {
        //从head节点开始遍历
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            //head节点不为null并且cas操作将head节点设置为null成功
            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    //更新head节点
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                //多线程获取节点失败,同时队列中只有一个节点(或者本身队列为空)
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                //获取next节点
                p = q;
        }
    }
}

peek出队列

/**peek操作是获取队列的head节点,只获取不移除**/
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);
                return item;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

size操作

/**size操作返回队列中节点的数量(数量大于Integer.MAX_VALUE时,返回Integer.MAX_VALUE,但是数据不准确,
  *该队列是线程安全的队列,在获取size操作时可能存在其他的add/remove操作在进行
 **/
public int size() {
    int count = 0;
    for (Node<E> p = first(); p != null; p = succ(p))
        if (p.item != null)
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

/**获取第一个没有被删除,可用的节点(可以看做是poll/peek操作的变体)**/
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

remove操作

remove操作:
/**如果队列中存在多个相同的值,每次只会删除第一个**/
public boolean remove(Object o) {
    if (o != null) {
        Node<E> next, pred = null;
        //从队列中第一个可用节点(有效节点)开始遍历
        for (Node<E> p = first(); p != null; pred = p, p = next) {
            boolean removed = false;
            E item = p.item;
            if (item != null) {
                if (!o.equals(item)) {
                    next = succ(p);
                    continue;
                }
                //将remove的节点值置为null
                removed = p.casItem(item, null);
            }

            next = succ(p);
            //如果remove节点有前驱节点,将前驱节点的next指向remove节点的后继节点
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        }
    }
    return false;
}

containts操作

containts操作:
/**contains操作与remove操作极其相似,只是不会进行remove,该方法判断结果并不准确,可能存在删除操作**/
public boolean contains(Object o) {
    if (o == null) return false;
    for (Node<E> p = first(); p != null; p = succ(p)) {
        E item = p.item;
        if (item != null && o.equals(item))
            return true;
    }
    return false;
}

 

以上是关于ConcurrentLinkedQueue的主要内容,如果未能解决你的问题,请参考以下文章

初始化后如何更新java.util.concurrent.ConcurrentLinkedQueue#head?

并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue用法

我通过调试ConcurrentLinkedQueue发现一个IDEA的小虫子(bug), vscode复现, eclipse毫无问题

《Java并发编程的艺术》之ConcurrentLinkedQueue

Java并发多线程编程——并发容器ConcurrentLinkedQueue

死磕 java集合之ConcurrentLinkedQueue源码分析