源码分析-SynchronousQueue
Posted 千念飞羽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析-SynchronousQueue相关的知识,希望对你有一定的参考价值。
SynchronousQueue
SynchronousQueue算法看的比较糊涂,不过时间拖得太久了,这里先写成这样,以后有了更好的理解之后再修改吧.如果有人看的有问题的地方.麻烦提醒我.
doc概述
SynchronousQueue作为阻塞队列的时候,对于每一个take的线程会阻塞直到有一个put的线程放入元素为止,反之亦然。在SynchronousQueue内部没有任何存放元素的能力。所以类似peek操作或者迭代器操作也是无效的,元素只能通过put类操作或者take类操作才有效。通常队列的第一个元素是当前第一个等待的线程。如果没有线程阻塞在该队列则poll会返回null。从Collection的视角来看SynchronousQueue表现为一个空的集合。
SynchronousQueue相似于使用CSP和Ada算法(不知道怎么具体指什么算法),他非常适合做交换的工作,生产者的线程必须与消费者的线程同步以传递某些信息、事件或任务
SynchronousQueue支持公平锁,如果是公平锁的话可以保证当前第一个队首的线程是等待时间最长的线程,这时可以视SynchronousQueue为一个FIFO队列。
源码概述
Synchronous是双栈双队列算法的扩展算法介绍LIFO栈用于非公平模式,FIFO队列用于公平模式。两者的性能是相当的,通常来说FIFO用于在竞争激烈的环境中维持更高的吞吐量,而LIFO则在通用的应用中维护更高线程局部性。
一个双端队列(栈也是相似的)在任意给定的时刻,或者持有一个数据(由put操作提供的)或者持有一个请求(由take操作提供的)或者为空。SynchronousQueue最有趣的特性是在无锁的情况下任何操作都可以执行其相应的功能。
队列和栈都拓展了抽象类Transferer,这个类定义了一个transfer方法,这个方法执行put或者take操作。这两个操作被整合为一个操作,因为在双端队列中这两者是对称的,所以可以近似的组合在一起。
SynchronousQueue在算法实现上和上文中提到的论文中的算法有一些细微的不同主要表现在三个方面:
- 原算法使用位标记指针,但是这里使用节点的位的模,从而产生的一系列改进
- SynchronousQueuee必须阻塞线程等待至任务被完成。
- 通过支持时间限和中断来实现取消,包括从list中清除节点和线程,以避免垃圾存留和内存消耗。
阻塞通常是由LockSupport的park和unpark方法完成的处理,除了下一个将要被处理的节点,先使用自旋一段时间。在高并发状态下,自旋可以显著的提高吞吐量。在低并发的情况下自旋量对性能的提升比较小不容易被察觉出来。
在队列和栈中clear操作使用不同的方式。在队列中,当一个任务取消时,我们几乎可以以O(1)时间移除一个元素(模重试操作(modulo retries)进行一致性检查)。但是如果当前节点被固定在尾部他可能需要等待到其他后续节点的取消才行。在栈中,我们需要最大O(n)时间去遍历所有操作以确信杀出了节点。但是这个操可以在其他线程访问栈的同时进行。
虽然垃圾回收系统会处理绝大多数问题,但是对于非阻塞节点的问题依然要非常小心的对待。尤其在对待持有对象引用的问题,有可能导致其他节点会比长期阻塞。将节点内容是为null与主算法冲突,通常情况下可以将节点内指针指向其自身。这个问题对于栈节点来说并不这么严重因为在栈中,阻塞的线程并不挂在旧的头结点上,但是队列中节点的引用必须被迅速的遗忘以避免出现一致性问题。
域
SynchronousQueue是一个比较少见的类,其本身的域和方法都没有什么特别的主要的内容都是通过内部类来实现。
域主要可以分为以下几大类:
设定时间限的类:
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
static final int maxUntimedSpins = maxTimedSpins * 16;
static final long spinForTimeoutThreshold = 1000L;
这几个主要是用来设定自旋时间限的,这里首先解释一下自旋时间限有什么用。
首先阻塞是代价非常大的操作,要保存当前线程的很多数据,并且要切换上下文,等线程解释阻塞的时候还有切换回来。所以通常来说在阻塞之前都先自旋,自旋其实就是在一个循环里不停的检测是否有效,当然这要设定时间限。如果在时间限内通过自旋完成了操作。那就不需要去阻塞这也自然是最好的提高了响应速度。但是如果自旋时间限内还是能没能完成操作那就只有阻塞了。
java中大量运用了这样的技术。凡是有阻塞的操作都会这样做,包括内置锁在内,内置锁其实也是这样的,内置锁分为偏向锁,轻量级锁和重量级锁,其中轻量级锁其实就是自旋来替代阻塞。
当然需要自旋多长时间。这是一个根据不同情况来设定的值并没有一个准确的结论,通常来说竞争越激烈这样多自旋一段时间总是好的,效果也越明显,但是自旋时间过长会浪费cpu时间所以,设定时间还是一个很依靠经验的值。
在这里其实是这样做的,首先看一下当前cpu的数量,NCPUS
然后分两种情况一种是设定了时间限的自旋时间
如果设定了时间限则使用maxTimedSpins,如果NCPUS数量大于等于2则设定为为32否则为0,既一个CPU时不自旋;这是显然了,因为唯一的cpu在自旋显然不能进行其他操作来满足条件。
如果没有设定时间限则使用maxUntimedSpins,如果NCPUS数量大于等于2则设定为为32*16,否则为0;
这里的两个时间限实际上表示的是自旋次数主要用在TransferQueue和TransfStack的awaitFulfill中,后面再详细解释。
另外还有一个参数spinForTimeoutThreshold 这个参数是为了防止自定义的时间限过长,而设置的,如果设置的时间限长于这个值则取这个spinForTimeoutThreshold 为时间限。这是为了优化而考虑的。这个的单位为纳秒。后文会提到。
这里另外插一句,如果我们自己去设置时间限该怎么设计,首先因为大部分java代码都有类似的阈值来防止过长的时间限所以不用太担心。不过要问具体数字的话,我曾看到过那里有资料曾说过大部分是时间限大概为数千操作的时间,也就是算一下循环内有多少操作,大概总共操作3000次进行阻塞就好。来源可不可靠就不好说啦- -。。
第二类域就只有一个:
private transient volatile Transferer transferer;
源码概述中曾经说过,SynchronousQueue实际上内部有两个类TransferStack和TransferQueue,他们都继承自Transfer抽象类。这里就是这两具体实现的引用。几乎所有的方法都委托给这个transfer来处理
第三类都是序列化的域了
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
这几个类都是用来做序列化的,但是这方面知识我不是很了解。而在SynchronousQueue内部写的代码也能难理解。之后待后续补充了,SynchronousQueue序列化是需要上锁的,而我不明白的一点是既然Synchronous内部没有任何东西如何去序列化。
内部类
首先一个抽象类Transfer内部一个抽象方法transfer
abstract static class Transferer
abstract Object transfer(Object e, boolean timed, long nanos);
TransferStack
首先在说Stack内部实现之前我先说一下大概是怎么回事,然后在去看代码比较好。之前源码概述里写的那个是对源码里英文的翻译,所以比较生硬,所以这里怎么形象的描述一下TransferStack是怎么工作的。一个典型的例子就是像俄罗斯方块一样。
首先一个TransferStack有三个状态,
- 可能是一个request,就是获取队列的请求对应于take操作,
- 也可能是一个放入data的操作对应于put,
- 或者是完成了匹配操的fulfill对应于一个交付操作,
那栈的状态就好像玩俄罗斯方块一样,只有三种情况
- 当前传入的模块是域栈顶模块相同(或者栈为空也是一样),对应于t0。
- 当前传入的模块与栈顶模块匹配,达成交付操作,对应于t1。
- 当前栈顶的模块是匹配模块需要删除,对应于t2。
TransfStack的域
TransferStack的域分为两大类:
一类是定义状态的域
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
定义TransferStack的三种状态。
一类是获得一个unsafe类。和栈顶元素的偏移量,还有一段设置偏移量的地址。
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
static
try
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
catch (Exception e)
throw new Error(e);
首先unsafe是一个进行内存操作的类,CAS是最常用的操作之一。
比较并替换,这是基本的同步原语,CAS和锁有什么区别。
简单的说CAS是乐观锁。就是假定没有冲突,先进行尝试,如果成功则返回true,继续后续程序,如果失败了就返回false。很多时候所说的无锁编程指的就是使用CAS,或者原子性操作而不使用锁,这样效率会高一些。因为CAS一般认为是CPU操作,也就是说相当于一步就完成了。(实际上似乎还需要进行缓存操作等等)
而通常所说的Synchronous或者Lock一般都悲观锁,先独占然后进行尝试。另外,CAS和自旋锁还是有区别的,自旋锁是尝试获得独占性的锁,而CAS是尝试进行内存操作。
响应速度快但是如果一直没办法完成CAS也是有危险的,所以这里是unsafe,所以我们都用锁而很少直接用CAS。unsafe的原意主要是说这个类在使用的时候即使出错了也不会有异常抛出。
个人理解,所谓的无锁操作未必是比锁更快的,而是各有各的优势,泛泛的说,在低并发情况下无锁可能会更快,但是在高并发环境下使用锁可能效率会更高。但是有些算法经过精巧的设计可以使用无锁算法替代锁的算法,而且在高并发条件下也是成立的,这样才算是好的无锁算法。但是在更多的情况下是没有办法设计出这么精巧的算法的。而显然SynchronousQueue显然属于精巧的算法。。
这里有关于CAS说的很好的博客非阻塞同步算法与CAS(Compare and Swap)无锁算法推荐一下。
内部类-SNode
静态内部类及Unsafe的用法:
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static
try
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
catch (Exception e)
throw new Error(e);
unsafe时内存操作,这里主要用到的是UNSAFE.compareAndSwapObject(Object,long,Object,Object);
这里正好整理下unsafe.compareAndSwapObject(Object,long,Object,Object);的使用方式。
看下这几个参数分别什么意思。CAS是对内存的操作,第一个Object可以理解为首地址,第二long可以理解为偏移地址,第三个参数是当前线程期望的内部域的值,最后一个参数是希望变成的值。从java的角度来看第一个Object其实就是你当前希望CAS类的实例对象,偏移地址则需要使用反射机制计算出来,但是通常来说都有个大概的值,比如如果是int的类型通常是4,但是根据反射计算是最好的,所以unsafe通常来说都是需要结合反射使用的。
首先对于一个类,如果想要使用CAS,需要定义一个unsafe的类。通常来说需要使用UNSAFE = sun.misc.Unsafe.getUnsafe();来获得一个静态类。假如需要CAS某个类的私有域,则需要使用静态内部类来声明其偏移地址。比如这里需要CAS内部类SNode的match和next,则需要使用反射的方法获得其偏移地址。
Snode的域:
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
next表示下一个节点
match表示与当前节点匹配的节点
waiter表示当前节点代表的线程
Object item表示当前的放入的item或者请求则为null
int表示当前的模式,REQUEST、DATA和FULFILLING。
next、match和waiter使用volatile变量因为可能涉及多线操作。为何这里可以不用锁,因为volatile可以提供可见性,但是其符合操作表示院子性的,所以可以用来做通知。比如在当前线程的对volatile变量的操作对其他线程也是可见的。
但是这里的Object item和int mode并不使用volatile。因为其写操作必定在其他原子操作之前写,在读操作之后。所以可以保证其操作的有效性。
#SNode的方法
casNext
tryMatch
tryCancel
casHead
主要就是这几个方法。cas是替换当前节点的next节点。
trycancel是取消当前操作。就是把当前的match匹配从null变成this。当前节点是null则说明当前节点的任务还没有完成,换成是this则说明当前this匹配其自身在这样的条件下认为任务是取消了。
boolean tryMatch(SNode s)
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s))
Thread w = waiter;
if (w != null) // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
return true;
return match == s;
这里只看一下tryMatch匹配。如果当前match不为null则说明当前任务完成或者取消了,CAS(null->s)则尝试匹配这两个任务。如果成功则返回true并用LockSupport.unpark(w)去解锁当前的线程。并返回true。否则则返回match==s。
为何这里不直接返回false。
原因还不太清楚,有可能当前线程已经匹配过了节点。
transfer相关方法
当然最核心的还是一个transfer方法,transfer方法就是实现SynchronousQueue最核心的方法。简单的说就是放入SNode节点。
代码如下,第一个object是放入的元素可能是item,也可能是null,第二个说明是否使用时间限,第三个参数是使用ns来计算的时间限。
Object transfer(Object e, boolean timed, long nanos)
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;)
SNode h = head;
if (h == null || h.mode == mode) // empty or same-mode
if (timed && nanos <= 0) // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
else if (casHead(h, s = snode(s, e, h, mode)))
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) // wait was cancelled
clean(s);
return null;
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (mode == REQUEST) ? m.item : s.item;
else if (!isFulfilling(h.mode)) // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode)))
for (;;) // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
SNode mn = m.next;
if (m.tryMatch(s))
casHead(s, mn); // pop both s and m
return (mode == REQUEST) ? m.item : s.item;
else // lost match
s.casNext(m, mn); // help unlink
else // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
整个程序在一个循环内,只有满足情况才能跳出循环。
大概分三种情况。
- 一种情况是当前栈为空或者当前模式相同的节点遇到一起。
- 第二种情况是尝试匹配当前的节点,先将当前节点s如栈,如果失败(栈顶节点可能会被其他线程匹配),则循环进行匹配。
- 第三种情况是辅助方法,清除匹配成功的节点,或者当节点所属线程消失后将其移除栈。
这里再看下awaitFulfill的代码:
在awaitFulfill中spin的特点有意思这里设置的其实是自旋次数
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
首先为什么要判断head.next==s。这里我猜测是因为希望只对head的next节点来进行自旋,如果不是头结点就不需要自旋而将spin设置为0。这样当进入自旋或者阻塞的判断内部,则进入阻塞。
其次对于设定了时间限和设时间限自旋是不一样的,如果没有设定时间限,则进行少量的自旋,也就是16次自旋。如果设定了时间限,怎根据自旋限的大小来进行判断,如果自旋时间小于时间限的阈值spinForTimeoutThreshold,即1000,则自旋16*32=512次(当cpu次数小于2)。如果设定的时间大于1000则进行待时限的阻塞。
clean(SNode s)
clean没太看懂。
s.item = null; // forget item
s.waiter = null; // forget thread
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
while (p != null && p != past)
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
清理s节点不仅仅清理s节点而且还负责将将清理其他的取消的节点和空节点。并且这里还遍历了栈节点,首先尝试设置栈顶节点,然后遍历栈,将取消的节点全部删去。这里还先找到past节点,past是s的后续非取消节点。这里只遍历p到past或者null。原因还是不是很理解。
TransferQueue
QNode
Qnode和Snode基本是是相同的
首先看非静态内部域:
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
next是节点。item是当前入栈或者出站的请求或者应答数据
waiter是当前节点的线程。
isData是表示当前是数据还是请求。这里没有使用Stack中模的方法。不过也是类似的。
在看静态域
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static
try
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
catch (Exception e)
throw new Error(e);
unsafe没什么好说的,itemOffset是结点的内容的偏移量,nextOffset是该节点后续节点的偏移量。
方法没有什么特殊的地方。基本上就是CAS操作的委托。
不过还有两点问题,第一个问题是如果item指向其自身,则说明当前节点取消了,第二个问题是如果是next指向其自身,则说明当前当前节点出队。
TransferQueue的域
TransferQueue的的非静态域
transient volatile QNode head;
transient volatile QNode tail;
transient volatile QNode cleanMe;
head和tail是首尾节点是很好理解的,cleanMe指的是已经被取消但是还没有出列的节点。cleaMe似乎只在clean方法中使用。
TransferQueue的transfer方法
Object transfer(Object e, boolean timed, long nanos)
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;)
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) // lagging tail
advanceTail(t, tn);
continue;
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) // wait was cancelled
clean(t, s);
return null;
if (!s.isOffList()) // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
return (x != null) ? x : e;
else // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? x : e;
transfer方法这里还是用图来描述比较好。总的来说也是一个循环,循环内就只有两种情况一种是空队列或者队头模相同,一种是需要入队列尾。这个整体比TransferStack要简单清晰一些。
awaitFulfill与TransferStack相同没必要说。
clean方法
void clean(QNode pred, QNode s)
s.waiter = null; // forget thread
while (pred.next == s) // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled())
advanceHead(h, hn);
continue;
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null)
advanceTail(t, tn);
continue;
if (s != t) // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
QNode dp = cleanMe;
if (dp != null) // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
else if (casCleanMe(null, pred))
return; // Postpone cleaning s
虽然流程图在这里。这里clean方法和cleanMe方法是我看的比较糊涂的地方。搜集了一下网上的资料大概总结一下。这里的为什么要这样做。
首先一个。这里的队列其实是单向链表。所以他只能设置后继的节点而不能设置前向的节点,这会产生一个问题,就是加入队列尾的节点失效了要删除怎么办?我们没办法引用队列尾部倒数第二个节点。所以这里采用了一个方法就是讲当前的尾结点保存问cleanMe节点,这样在下次再次清除的时候通常cleanMe通常就不是尾结点了,这样就可以删除了。也就是每次调用的时候删除的其实是上次需要结束的节点。
以上是关于源码分析-SynchronousQueue的主要内容,如果未能解决你的问题,请参考以下文章
并发编程—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析
死磕 java集合之LinkedTransferQueue源码分析