java同步阻塞队列之SynchronousQueue
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之SynchronousQueue相关的知识,希望对你有一定的参考价值。
SynchronousQueue是一个比较独特的队列,其本身是没有容量的,比如我放一个元素到队列中去,不能立马返回,必须要等有人消费了这个元素之后才能返回。
SynchronousQueue底层,提供了两种数据结构,队列和栈,实现了公平调度和非公平调度。
其内部有一个接口Transferer
是其实现的基础:
abstract static class Transferer<E>
abstract E transfer(E e, boolean timed, long nanos);
而Transferer
有两个实现TransferQueue
和TransferStack
,这两种结构来实现入队和出队。
public SynchronousQueue()
this(false);
public SynchronousQueue(boolean fair)
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
默认SynchronousQueue
的构造中是非公平的,采用TransferStack
实现。我们看下其入队和出队操作:
public void put(E e) throws InterruptedException
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null)
Thread.interrupted();
throw new InterruptedException();
public E take() throws InterruptedException
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
可以看到,入队和出队都是调用transferer.transfer
方法,只不过入队元素不为空,出队时候,参数中元素为空。我们看下基于栈的``TransferStack是怎么实现的:
E transfer(E e, boolean timed, long nanos)
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;)
SNode h = head;
// 栈头为空或者本次操作mode和栈头mode一样,这时候只能入栈,找不到匹配的操作,某一个时刻,栈内永远只有一种节点至多只有一个另外类型节点,这时候表名需要匹配
if (h == null || h.mode == mode) // empty or same-mode
// 不进行阻塞等待
if (timed && nanos <= 0) // can't wait
// 栈头操作取消,直接将栈头下一个元素设置为栈头
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
// 栈头是空的,直接返回空
return null;
else if (casHead(h, s = snode(s, e, h, mode))) // 没有超时,直接将新元素作为栈头
// e等待出栈,一种是空队列,一种是take,awaitFulfill会阻塞节点,一直到节点有匹配的操作,比如take匹配put,
//这里返回不是节点本身,而是节点匹配操作的节点,比如take节点返回的则是put节点,如果没有则会先自旋然后阻塞等待
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) // wait was cancelled
clean(s);
return null;
// s后继节点不为空,将后继节点设置为head,如果走到这一步表示已经找到了匹配的节点,可以返回了
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// REQUEST表示的是take操作,需要获取put操作的数据也就是上面匹配到的m节点,否则返回s
return (E) ((mode == REQUEST) ? m.item : s.item);
else if (!isFulfilling(h.mode)) // 到这一步,表名栈头不为空,且当前操作元素与栈头节点mode不一样,这时候可以唤醒栈头节点
// 在这个条件下操作时,新加入的元素跟栈里面的元素mode肯定不一样(未加入前栈里元素mode都是一样的)
// 理论上是肯定能匹配到的,但是有可能后续节点取消了等待,那么久从栈头一直往后遍历找,如果到最后都没找到
// 这时候栈除了栈头,栈都空了,这时候将栈头也设置为空,重新走最外层的循环
if (h.isCancelled()) // 头结点取消了,那么将头结点的next节点设置为头结点
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) // 这时候,将当前元素入栈到栈头
for (;;) // loop until matched or waiters disappear
SNode m = s.next; // 理论上 ,s.next就是s节点的匹配节点
if (m == null) // 如果这时候m 为空,表名,栈里面出栈头,其他线程都取消等待了,没有元素了
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
// 一直往栈里面找,只要是节点没有取消等待,肯定是能够找到的
SNode mn = m.next;
if (m.tryMatch(s)) // 匹配上,直接返回
casHead(s, mn); // 这时候弹出的是栈头的两个元素,即栈头和栈头的下一个元素(匹配上),重置栈头,应该设置栈头为 h.next.next
return (E) ((mode == REQUEST) ? m.item : s.item);
else // 没有匹配上,继续往后续节点找,这时候s.next取消了等待,可以移除了
s.casNext(m, mn); // 将s.next设置为 s.next.next,同时将s.next移除,这时候s.next可能是在并发操作的时候取消了等待
else // 这种情况一般是上面条件在执行,之后另外一个线程新加入元素,这时候栈里面应该是有一对能够匹配上的节点
SNode m = h.next; // m is h's match
if (m == null) // m节点取消了等待
casHead(h, null); // pop fulfilling node
else
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
以上是关于java同步阻塞队列之SynchronousQueue的主要内容,如果未能解决你的问题,请参考以下文章
java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比
java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理
Java同步数据结构之ConcurrentLinkedQueue