JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)相关的知识,希望对你有一定的参考价值。

ConcurrentLinkedQueue

文章目录

类图

组成

数据结构

一个头结点和一个尾结点

每个结点Node包含item和下一个结点next引用,默认情况下,tail等于head,元素为null。

内部类

private static class Node<E> 
    // 元素
    volatile E item;
    // 下一个结点引用
    volatile Node<E> next;

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    // 构造函数
    Node(E item) 
        // 设置item的值
        UNSAFE.putObject(this, itemOffset, item);
    

    // 设置item的值
    boolean casItem(E cmp, E val) 
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    

    void lazySetNext(Node<E> val) 
        // 设置next域的值,并不会保证修改对其他线程立即可见
        UNSAFE.putOrderedObject(this, nextOffset, val);
    

    // 比较并替换next域的值
    boolean casNext(Node<E> cmp, Node<E> val) 
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    // item域的偏移量
    private static final long itemOffset;
    // next域的偏移量
    private static final long nextOffset;

    static 
        try 
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
         catch (Exception e) 
            throw new Error(e);
        
    

类的属性

// 链表首结点
private transient volatile Node<E> head;
// 链表尾结点
private transient volatile Node<E> tail;

类的构造函数

默认无参构造函数,创建一个空的ConcurrentLinkedQueue。

public ConcurrentLinkedQueue() 
    // 初始化头结点与尾结点
    head = tail = new Node<E>(null);

将给的集合c转换成ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。

public ConcurrentLinkedQueue(Collection<? extends E> c) 
    Node<E> h = null, t = null;
    for (E e : c)  //遍历c集合
        // 检查元素不为null
        checkNotNull(e);
        // 新建node结点
        Node<E> newNode = new Node<E>(e);
        // 首结点为空
        if (h == null)
            h = t = newNode;
        else 
            // 尾插法
            // 直接头结点的next域
            t.lazySetNext(newNode);
            // 重新赋值头结点
            t = newNode;
        
    
    // 头结点为null
    if (h == null)
        // 新结头结点与尾结点
        h = t = new Node<E>(null);
    head = h;
    tail = t;

核心方法

方法名描述
boolean add(E e)在此队列的尾部插入指定元素。 由于队列是无界的,因此该方法永远不会抛出IllegalStateException或返回false
void updateHead(Node<E> h, Node<E> p)尝试 CAS 前往 p。 如果成功,将旧头重新指向自身作为succ()的哨兵,如下所示。
Node<E> succ(Node<E> p)如果p.next已链接到self,则返回p的后继结点或头结点,这仅在使用现在不在列表中的过时指针遍历时才为真。
boolean offer(E e)在此队列的尾部插入指定元素。 由于队列是无界的,因此此方法永远不会返回false
E poll()此方法移除并返回此ConcurrentLinkedQueue的头部;如果此队列为空,则返回null。
E peek()此方法返回此ConcurrentLinkedQueue的头部,而不删除它
Node<E> first()返回列表中的第一个活动(未删除)结点,如果没有,则返回 null。 这是 poll/peek 的另一种变体; 这里返回第一个结点,而不是元素。 我们可以让 peek() 成为 first() 的包装器,但这会花费额外的 volatile 读取项,并且需要添加一个重试循环来处理输给并发 poll() 的竞争的可能性。
int size()返回此队列中的元素数。 如果此队列包含多个 Integer.MAX_VALUE 元素,则返回 Integer.MAX_VALUE。

请注意,与大多数集合不同,此方法不是恒定时间操作。 由于这些队列的异步特性,确定当前元素的数量需要 O(n) 遍历。 此外,如果在此方法执行期间添加或删除元素,则返回的结果可能不准确。 因此,这种方法在并发应用程序中通常不是很有用。
boolean remove(Object o)从此队列中移除指定元素的单个实例(如果存在)。 更正式地说,如果该队列包含一个或多个这样的元素,则删除一个元素 e 使得 o.equals(e)。 如果此队列包含指定的元素(或等效地,如果此队列因调用而更改),则返回 true。

入队列 boolean offer(E e)

    public boolean offer(E e) 
        // 元素不为null
        checkNotNull(e);
        // 新建一个结点
        final Node<E> newNode = new Node<E>(e);

        // 无限循环遍历链表
        // 开启一个从tail指针开始遍历的循环,p指向当前疑似尾结点;
        for (Node<E> t = tail, p = t;;) 
            // 初始化q=p的下一个结点;
            Node<E> q = p.next;
            // 如果q==null,说明此刻p确实是尾结点,新结点应该插到后面;
            if (q == null) 
                // p is last node
                if (p.casNext(null, newNode))   // CAS将新结点链接到p结点上
                    //首次添加时,p 等于t,不进行尾结点更新,所以所尾结点存在滞后性  
                    //并发环境,可能存添加/删除,tail就更难保证正确指向最后结点。
                    if (p != t) // 和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个结点;
                        // p不等t,CAS替换新结点为尾结点
                        casTail(t, newNode);  // cas操作安全移动tail指针。
                    return true;
                
                // Lost CAS race to another thread; re-read next
            
            else if (p == q) //p==q,也即p的next字段指向自身,说明p是一个脱离了链表的结点,需要找一个结点重新开始遍历。
                // 原来的尾结点与现在的尾结点是否相等,若相等,则p为head,否则,赋值为现在的尾结点
                // 当tail不执行最后结点时,如果执行出列操作,很有可能将tail也给移除了
                // 此时需要对tail结点进行复位,复位到head结点
                // 从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.按道理直接p=q跳到下一个结点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。
                p = (p != t && t != (t = tail)) ? t : q;
        
    

入队列主要考虑两件事:

  1. 新增新结点,设置为当前未结点的next

  2. 是否更新tail结点,若tail的next不为空,则将入队列的结点变为新的tail结点;若tail的next为空,则不更新tail结点。

    通过减少更新tail的次数,提高入队效率

出队列 E poll()

public E poll() 
    restartFromHead:
    for (;;) 
        // 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
        for (Node<E> h = head, p = h, q;;) 
            E item = p.item;
            // 如果p结点的数据不为空,那么p肯定就是真正的第一个结点,只要能将结点的数据字段(item)置为null,出队列就算成功);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。
            if (item != null && p.casItem(item, null)) 
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                // p == h,说明刚出队列的p就是遍历开始的第一个结点,有2种情况
                // 1)没有并发poll,head指向了空结点,队列里面只有且仅有一个空结点,此时没有必要做head移动;
                // 2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
                if (p != h) // hop two nodes at a time
                    // 如果p.next==null,说明p是尾结点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            
            // 无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了,此时将head置为尾结点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
            else if ((q = p.next) == null) 
                updateHead(h, p);
                return null;
            
            // p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了(参考第一结总结的结点属性),这种情况下,只能重新开始poll。
            else if (p == q)
                continue restartFromHead;
            else
                // 正常迭代下一个结点。
                p = q;
        
    

将首结点的指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集。

updateHead(Node<E> h, Node<E> p)

    final void updateHead(Node<E> h, Node<E> p) 
        if (h != p && casHead(h, p))
            // h的next指向自己,从链表中断开
            h.lazySetNext(h);
    

h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到((q = p.next) == null)时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么(p == q)的判断也可能失败,最终执行到 p = q,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为这种情况造成的性能损失低于volatile造成的性能损失)。

移除操作remove(Object o)

public boolean remove(Object o) 
    if (o != null) 
        Node<E> next, pred = null;
        for (Node<E> p = first(); p != null; pred = p, p = next) 
            boolean removed = false;
            E item = p.item;
            // 判断当前是不是空结点。
            if (item != null) 
                // 如果结点数据与输入参数相等,说明这是一个需要移除的结点,否则应该跳到下一个结点;
                if (!o.equals(item)) 
                    // 结点数据不相等,取出当前结点p的下一个结点,然后重新开始循环;
                    next = succ(p);
                    continue;
                
                // 结点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
                removed = p.casItem(item, null);
            
            // 执行到这一行说明,p是空结点,(本来就是空,或在L5被清空);取出next结点;
            next = succ(p);
            // 由于p是空结点,尝试将p的前继和后继相连;
            if (pred != null && next != null) // unlink
                pred.casNext(p, next);
            if (removed)
                return true;
        
    
    return false;

first()

Node<E> first() 
    restartFromHead:
    // 自旋
    for (;;) 
        // 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
        for (Node<E> h = head, p = h, q;;) 
            // p中是否存在元素
            boolean hasItem = (p.item != null);
            // 若有元素或next为null则更新首结点
            if (hasItem || (q = p.next) == null) 
                updateHead(h, p);
                // 若结点中有元素则返回此结点,链表中存在包含元素的结点
                // 若结点中没有元素则返回null,链表中没有实际数据。
                return hasItem ? p : null;
            
            // p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了,这种情况下,只能重新开始first。
            else if (p == q)
                continue restartFromHead;
            else
                // 正常迭代下一个结点。
                p = q;
        
    

isEmpty()

public boolean isEmpty() 
    // 通过first的返回值判断链表是否存在实际数据
    return first() == null;

size()

    public int size() 
        int count = 0;
        // 从实际的首结点开始遍历链表,计算链表中实际包含数据的结点数
        for (Node<E> p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                // 链表最大值是Integer.MAX_VALUE
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    

如果 p.next 已链接到 self,则返回首结点。否则返回下一个结点。

    final Node<E> succ(Node<E> p) 
        Node<E> next = p.next;
        return (p == next) ? head : next;
    

状态说明

在整个生命周期内,算法保持以下属性:

  1. 链表里面至少会有一个节点,数据字段为null的节点是空节点;
  2. 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
  3. TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
  4. 和队列断开的节点,next字段指向自身;
  5. 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。


参考

作者:longhuihu 链接:https://www.jianshu.com/p/ce6108e4b2c4

以上是关于JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)

JUC系列并发容器之ConcurrentHashMap(JDK1.8版)

JUC系列并发容器之ConcurrentHashMap(JDK1.8版)扩容图解说明

Java并发编程系列之三JUC概述

JUC系列01之大话并发

Java并发编程系列之三JUC概述