源码角度了解阻塞队列之SynchronousQueue
Posted 周杰伦本人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码角度了解阻塞队列之SynchronousQueue相关的知识,希望对你有一定的参考价值。
源码角度了解阻塞队列之SynchronousQueue
SynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作
从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储
put()方法
public void put(E e) throws InterruptedException
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null)
Thread.interrupted();
throw new InterruptedException();
我们可以看到它的put()方法没有什么逻辑,主要调用Transfererd的transfer()方法来放入元素
transfer()方法
Transfererd是一个抽象类,它的实现类有TransferQueue和TransferStack,分别是公平队列的实现和非公平队列的实现
我们先看一下TransferQueue的transfer()方法的实现逻辑
TransferQueue的transfer()方法
E transfer(E e, boolean timed, long nanos)
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;)
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
if (h == t || t.isData == isData) // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) // lagging tail
advanceTail(t, tn);
continue;
if (timed && nanos <= 0) // cant wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) // wait was cancelled
clean(t, s);
return null;
if (!s.isOffList()) // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
return (x != null) ? (E)x : e;
else // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
代码有点长,我们分析一下:
- isData代表是生产者模式还是消费者模式,如果传入的元素e是空的话显然是消费者模式
- for循环,初始化QNode的头结点和尾结点,如果为空的话说明没有初始化好,继续for循环进行初始化
- 如果头尾节点相同或者是尾部节点模式和传入的模式相同的话,添加到尾部
- 新建节点,添加到尾部,然后调用awaitFulfill()方法进入阻塞状态
- 当线程唤醒发现在等待队列中第一个的时候,返回元素
- 如果模式不相同的话,比如来的是消费者,而等待队列全是生产者,这时候与队列中的第一个元素进行配的,配对成功出队列,同时调用LockSupport.unpark(m.waiter)来唤醒生产者
TransferStack的transfer()方法
- 同样判断是否为同种模式,如果是同种模式,入栈 阻塞
- 如果不是同种模式,进行配对一起出栈
take()方法
SynchronousQueue的take()方法也是调用transferer的transfer()方法来获取元素,然后返回元素
public E take() throws InterruptedException
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
总结
SynchronousQueue有公平队列和非公平队列两种实现方式,公平队列采用队列实现先进先出,非公平队列采用栈实现,先进后出,它的最大特点就是生产者来了需要阻塞,当对应的消费者来之后匹配成功才会被唤醒,这个SynchronousQueue产生阻塞,效率不高,一般不怎么使用
至此,我们的阻塞队列的实现基本介绍了,有ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue、DelayQueue和SynchronousQueue
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
- 欢迎关注我❤️,点赞
以上是关于源码角度了解阻塞队列之SynchronousQueue的主要内容,如果未能解决你的问题,请参考以下文章
JDK源码那些事儿之ConcurrentLinkedQueue