3.3.4深度剖析ConcurrentLinkedQueue

Posted anxbb

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了3.3.4深度剖析ConcurrentLinkedQueue相关的知识,希望对你有一定的参考价值。

队列、链表之类的数据结构及其常用。Java中,ArrayList和Vector都是使用数组作为其内部实现。两者最大的不同在于:Vector是线程安全的,而ArrayList不是。此外LinkedList使用链表的数据结构实现了List,但并不是线程安全的,就像之前包装HashMap,这里我们可以使用Collections.synchronizedList()来包装任意List。此时,生成的List对象就是线程安全的。

public static List<String> l = Collections.synchronizedList(new LinkedList<String>());

 

高效读写的队列:深度剖析ConcurrentLinkedQueue

 JDK中,提供了一个ConcurrentLinkedQueue类来实现高并发的队列。这个队列使用链表作为其数据结构。这个类算是高并发环境中性能最好的队列。高性能是因为其内部复杂的实现。

链表内的节点:

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    ...

 

其实这是一个静态内部类,item是来表示元素的。next表示当前Node的下一个元素。 
对Node进行操作时,使用了CAS操作:

//表示设置当前Node的item值。第一个参数为期望值,第二个参数为设置目标值。当当前值等于期望值时(就是没有被其他人改过),就会将目标设置为val。
boolean casItem(E cmp, E val) {
    return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}

//和上个方法类似,是用来设置next字段。
boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

ConcurrentLinkedQueue内部有两个重要的字段:head和tail,分别表示链表的头部和尾部,当然都得是Node类型。对于head来说,它永远不会为null,并且通过head和succ()后继方法一定能完整地遍历整个链表。对于tail来说,它自然表示队列的末尾。 
以下是构造函数:

    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }

但是ConcurrentLinkedQueue的内部实现非常复杂,它允许在运行时链表处于多个不同的状态。以tail为例,一般来说我们期望tail总是作为链表的末尾,但实际上tail的更新并不是及时的,可能会产生拖延现象。就是说,有可能指向的并不是真正的尾巴,真正的可能在后面一个或者多个。

下面就是向队列中添加元素的offer方法:

/**
     * Inserts the specified element at the tail of this queue.
     * 在队列后插入一个元素
     * As the queue is unbounded, this method will never return {@code false}.
     * 因为队列是无限大的,这个方法永远不会回false
     *
     * @return {@code true} (as specified by {@link Queue#offer})
     * 返回true,
     * @throws NullPointerException if the specified element is null
     * 如果某元素为空则抛出空指针异常
     */
    public boolean offer(E e) {
        checkNotNull(e);//检查e是否为空
        final Node<E> newNode = new Node<E>(e);//创建新节点

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) { //1
                // p is last node. p是最后一个节点
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        //每两次,更新一下tail
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next CAS竞争失败,再次尝试。
            }
            else if (p == q)//2
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                //遇到哨兵节点,从head开始遍历。
                //但如果tail被修改,则使用tail(因为可能被修改正确了)
                p = (t != (t = tail)) ? t : head;
            else           //3
                // Check for tail updates after two hops.取下一个节点或者最后一个节点
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

这个方法没有任何锁操作。线程安全完全由CAS操作和队列的算法来保证。整个算法的核心是for循环,这个循环没有出口,知道尝试成功,这也符合CAS操作的流程。 
当第一次加入元素的时候,由于队列为空,因此p.next为null。程序进入1处,程序将p的next节点赋值为newNode,也就是将新的元素加入到队列中。此时p==t成立,也就是不会执行casTail()代码更新tail末尾。如果casNext()成功,程序直接返回,如果失败,则再进行一次循环尝试,直到成功。因此,增加一个元素后,tail并不会被更新。 
当程序尝试加入第二个元素时,由于此时t还在head的位置上,因此p.next指向实际的第一个元素,因此1处的q!=null,这表示q不是最后的节点。由于往队列中增加元素需要最后一个节点,因此,循环开始中查找最后一个节点。于是,程序进入3处,获得最后一个节点。此时,p实际上是指向链表中的第一个元素,它的next是null,故在第二个循环的时候,进入1处,p更新自己的next,让它指向新加入的节点。如果成功,由于此时p!=t成功,则会更新t所在位置,将t移到链表最后。 
2处处理了p==q的情况。这种情况是由于遇到了哨兵节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点或者空节点。当遇到哨兵节点的时候,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步查找到链表末尾。但一旦发生在执行过程中,tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾,这样就避免了重新查找tail的开销。

p = (p != t && t != (t = tail)) ? t : q;

这段代码中:“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”时,程序会先取得t的值,在执行t=tail,并取得新的t值。然后比较这两个值是否相等。在单线程时,t!=t显然是不成立的。但是在并发环境中,有可能在获得左边的t值后,右边的t值被其他线程修改。这样t!=t就可能成立。这就表示tail在中途被其他线程篡改。这时,我们就可以用新的tail作为链表末尾,也就是等式右边的t。如果tail没有被修改,则返回head,要求从头部开始,重新查找尾部。 
技术分享图片
技术分享图片








以上是关于3.3.4深度剖析ConcurrentLinkedQueue的主要内容,如果未能解决你的问题,请参考以下文章

深度优先搜索剖析

Flutter Dio源码分析--深度剖析

深度剖析,从普通时钟系统到各种授时方式

libevent源码深度剖析

深度剖析智能指针

mybatis源码级别深度剖析