JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)相关的知识,希望对你有一定的参考价值。
ConcurrentLinkedQueue
文章目录
类图
组成
数据结构
一个头结点和一个尾结点
每个结点Node包含item和下一个结点next引用,默认情况下,tail等于head,元素为null。
内部类
private static class Node<E>
// 元素
volatile E item;
// 下一个结点引用
volatile Node<E> next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 构造函数
Node(E item)
// 设置item的值
UNSAFE.putObject(this, itemOffset, item);
// 设置item的值
boolean casItem(E cmp, E val)
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
void lazySetNext(Node<E> val)
// 设置next域的值,并不会保证修改对其他线程立即可见
UNSAFE.putOrderedObject(this, nextOffset, val);
// 比较并替换next域的值
boolean casNext(Node<E> cmp, Node<E> val)
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// item域的偏移量
private static final long itemOffset;
// next域的偏移量
private static final long nextOffset;
static
try
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
catch (Exception e)
throw new Error(e);
类的属性
// 链表首结点
private transient volatile Node<E> head;
// 链表尾结点
private transient volatile Node<E> tail;
类的构造函数
默认无参构造函数,创建一个空的ConcurrentLinkedQueue。
public ConcurrentLinkedQueue()
// 初始化头结点与尾结点
head = tail = new Node<E>(null);
将给的集合c转换成ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
public ConcurrentLinkedQueue(Collection<? extends E> c)
Node<E> h = null, t = null;
for (E e : c) //遍历c集合
// 检查元素不为null
checkNotNull(e);
// 新建node结点
Node<E> newNode = new Node<E>(e);
// 首结点为空
if (h == null)
h = t = newNode;
else
// 尾插法
// 直接头结点的next域
t.lazySetNext(newNode);
// 重新赋值头结点
t = newNode;
// 头结点为null
if (h == null)
// 新结头结点与尾结点
h = t = new Node<E>(null);
head = h;
tail = t;
核心方法
方法名 | 描述 |
---|---|
boolean add(E e) | 在此队列的尾部插入指定元素。 由于队列是无界的,因此该方法永远不会抛出IllegalStateException 或返回false 。 |
void updateHead(Node<E> h, Node<E> p) | 尝试 CAS 前往 p。 如果成功,将旧头重新指向自身作为succ() 的哨兵,如下所示。 |
Node<E> succ(Node<E> p) | 如果p.next 已链接到self ,则返回p 的后继结点或头结点,这仅在使用现在不在列表中的过时指针遍历时才为真。 |
boolean offer(E e) | 在此队列的尾部插入指定元素。 由于队列是无界的,因此此方法永远不会返回false 。 |
E poll() | 此方法移除并返回此ConcurrentLinkedQueue的头部;如果此队列为空,则返回null。 |
E peek() | 此方法返回此ConcurrentLinkedQueue的头部,而不删除它 |
Node<E> first() | 返回列表中的第一个活动(未删除)结点,如果没有,则返回 null。 这是 poll/peek 的另一种变体; 这里返回第一个结点,而不是元素。 我们可以让 peek() 成为 first() 的包装器,但这会花费额外的 volatile 读取项,并且需要添加一个重试循环来处理输给并发 poll() 的竞争的可能性。 |
int size() | 返回此队列中的元素数。 如果此队列包含多个 Integer.MAX_VALUE 元素,则返回 Integer.MAX_VALUE。 请注意,与大多数集合不同,此方法不是恒定时间操作。 由于这些队列的异步特性,确定当前元素的数量需要 O(n) 遍历。 此外,如果在此方法执行期间添加或删除元素,则返回的结果可能不准确。 因此,这种方法在并发应用程序中通常不是很有用。 |
boolean remove(Object o) | 从此队列中移除指定元素的单个实例(如果存在)。 更正式地说,如果该队列包含一个或多个这样的元素,则删除一个元素 e 使得 o.equals(e)。 如果此队列包含指定的元素(或等效地,如果此队列因调用而更改),则返回 true。 |
入队列 boolean offer(E e)
public boolean offer(E e)
// 元素不为null
checkNotNull(e);
// 新建一个结点
final Node<E> newNode = new Node<E>(e);
// 无限循环遍历链表
// 开启一个从tail指针开始遍历的循环,p指向当前疑似尾结点;
for (Node<E> t = tail, p = t;;)
// 初始化q=p的下一个结点;
Node<E> q = p.next;
// 如果q==null,说明此刻p确实是尾结点,新结点应该插到后面;
if (q == null)
// p is last node
if (p.casNext(null, newNode)) // CAS将新结点链接到p结点上
//首次添加时,p 等于t,不进行尾结点更新,所以所尾结点存在滞后性
//并发环境,可能存添加/删除,tail就更难保证正确指向最后结点。
if (p != t) // 和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个结点;
// p不等t,CAS替换新结点为尾结点
casTail(t, newNode); // cas操作安全移动tail指针。
return true;
// Lost CAS race to another thread; re-read next
else if (p == q) //p==q,也即p的next字段指向自身,说明p是一个脱离了链表的结点,需要找一个结点重新开始遍历。
// 原来的尾结点与现在的尾结点是否相等,若相等,则p为head,否则,赋值为现在的尾结点
// 当tail不执行最后结点时,如果执行出列操作,很有可能将tail也给移除了
// 此时需要对tail结点进行复位,复位到head结点
// 从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.按道理直接p=q跳到下一个结点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。
p = (p != t && t != (t = tail)) ? t : q;
入队列主要考虑两件事:
-
新增新结点,设置为当前未结点的next
-
是否更新tail结点,若tail的next不为空,则将入队列的结点变为新的tail结点;若tail的next为空,则不更新tail结点。
通过减少更新tail的次数,提高入队效率
出队列 E poll()
public E poll()
restartFromHead:
for (;;)
// 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
for (Node<E> h = head, p = h, q;;)
E item = p.item;
// 如果p结点的数据不为空,那么p肯定就是真正的第一个结点,只要能将结点的数据字段(item)置为null,出队列就算成功);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。
if (item != null && p.casItem(item, null))
// Successful CAS is the linearization point
// for item to be removed from this queue.
// p == h,说明刚出队列的p就是遍历开始的第一个结点,有2种情况
// 1)没有并发poll,head指向了空结点,队列里面只有且仅有一个空结点,此时没有必要做head移动;
// 2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。
if (p != h) // hop two nodes at a time
// 如果p.next==null,说明p是尾结点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
// 无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了,此时将head置为尾结点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。
else if ((q = p.next) == null)
updateHead(h, p);
return null;
// p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了(参考第一结总结的结点属性),这种情况下,只能重新开始poll。
else if (p == q)
continue restartFromHead;
else
// 正常迭代下一个结点。
p = q;
将首结点的指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集。
updateHead(Node<E> h, Node<E> p)
final void updateHead(Node<E> h, Node<E> p)
if (h != p && casHead(h, p))
// h的next指向自己,从链表中断开
h.lazySetNext(h);
h.lazySetNext(h)
操作没有volatile
的语义,有可能对其他线程暂时不可见。 假设poll方法执行到((q = p.next) == null)
时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么(p == q)
的判断也可能失败,最终执行到 p = q
,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为这种情况造成的性能损失低于volatile造成的性能损失)。
移除操作remove(Object o)
public boolean remove(Object o)
if (o != null)
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next)
boolean removed = false;
E item = p.item;
// 判断当前是不是空结点。
if (item != null)
// 如果结点数据与输入参数相等,说明这是一个需要移除的结点,否则应该跳到下一个结点;
if (!o.equals(item))
// 结点数据不相等,取出当前结点p的下一个结点,然后重新开始循环;
next = succ(p);
continue;
// 结点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败;
removed = p.casItem(item, null);
// 执行到这一行说明,p是空结点,(本来就是空,或在L5被清空);取出next结点;
next = succ(p);
// 由于p是空结点,尝试将p的前继和后继相连;
if (pred != null && next != null) // unlink
pred.casNext(p, next);
if (removed)
return true;
return false;
first()
Node<E> first()
restartFromHead:
// 自旋
for (;;)
// 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。
for (Node<E> h = head, p = h, q;;)
// p中是否存在元素
boolean hasItem = (p.item != null);
// 若有元素或next为null则更新首结点
if (hasItem || (q = p.next) == null)
updateHead(h, p);
// 若结点中有元素则返回此结点,链表中存在包含元素的结点
// 若结点中没有元素则返回null,链表中没有实际数据。
return hasItem ? p : null;
// p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了,这种情况下,只能重新开始first。
else if (p == q)
continue restartFromHead;
else
// 正常迭代下一个结点。
p = q;
isEmpty()
public boolean isEmpty()
// 通过first的返回值判断链表是否存在实际数据
return first() == null;
size()
public int size()
int count = 0;
// 从实际的首结点开始遍历链表,计算链表中实际包含数据的结点数
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
// 链表最大值是Integer.MAX_VALUE
if (++count == Integer.MAX_VALUE)
break;
return count;
如果 p.next 已链接到 self,则返回首结点。否则返回下一个结点。
final Node<E> succ(Node<E> p)
Node<E> next = p.next;
return (p == next) ? head : next;
状态说明
在整个生命周期内,算法保持以下属性:
- 链表里面至少会有一个节点,数据字段为null的节点是空节点;
- 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
- TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
- 和队列断开的节点,next字段指向自身;
- 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。
参考
作者:longhuihu 链接:https://www.jianshu.com/p/ce6108e4b2c4
以上是关于JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)的主要内容,如果未能解决你的问题,请参考以下文章
JUC系列并发容器之ConcurrentLinkedQueue(JDK1.8版)
JUC系列并发容器之ConcurrentHashMap(JDK1.8版)