java同步阻塞队列之SynchronousQueue

Posted Leo Han

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之SynchronousQueue相关的知识,希望对你有一定的参考价值。

SynchronousQueue是一个比较独特的队列,其本身是没有容量的,比如我放一个元素到队列中去,不能立马返回,必须要等有人消费了这个元素之后才能返回。
SynchronousQueue底层,提供了两种数据结构,队列和栈,实现了公平调度和非公平调度。
其内部有一个接口Transferer是其实现的基础:

 abstract static class Transferer<E> 
        abstract E transfer(E e, boolean timed, long nanos);
    

Transferer有两个实现TransferQueueTransferStack,这两种结构来实现入队和出队。

public SynchronousQueue() 
        this(false);
    
  public SynchronousQueue(boolean fair) 
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    

默认SynchronousQueue的构造中是非公平的,采用TransferStack实现。我们看下其入队和出队操作:

public void put(E e) throws InterruptedException 
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) 
            Thread.interrupted();
            throw new InterruptedException();
        
    
public E take() throws InterruptedException 
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    

可以看到,入队和出队都是调用transferer.transfer方法,只不过入队元素不为空,出队时候,参数中元素为空。我们看下基于栈的``TransferStack是怎么实现的:

E transfer(E e, boolean timed, long nanos) 

            SNode s = null; // constructed/reused as needed
            int mode = (e == null) ? REQUEST : DATA;

            for (;;) 
                SNode h = head;
                // 栈头为空或者本次操作mode和栈头mode一样,这时候只能入栈,找不到匹配的操作,某一个时刻,栈内永远只有一种节点至多只有一个另外类型节点,这时候表名需要匹配
                if (h == null || h.mode == mode)   // empty or same-mode
                	// 不进行阻塞等待
                    if (timed && nanos <= 0)       // can't wait
                    	// 栈头操作取消,直接将栈头下一个元素设置为栈头
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // pop cancelled node
                        else
                        	// 栈头是空的,直接返回空
                            return null;
                     else if (casHead(h, s = snode(s, e, h, mode)))  // 没有超时,直接将新元素作为栈头
                    	// e等待出栈,一种是空队列,一种是take,awaitFulfill会阻塞节点,一直到节点有匹配的操作,比如take匹配put,
                    	//这里返回不是节点本身,而是节点匹配操作的节点,比如take节点返回的则是put节点,如果没有则会先自旋然后阻塞等待
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s)                // wait was cancelled
                            clean(s);
                            return null;
                        
                        // s后继节点不为空,将后继节点设置为head,如果走到这一步表示已经找到了匹配的节点,可以返回了
                        if ((h = head) != null && h.next == s)
                            casHead(h, s.next);     // help s's fulfiller
						// REQUEST表示的是take操作,需要获取put操作的数据也就是上面匹配到的m节点,否则返回s
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    
                 else if (!isFulfilling(h.mode))  // 到这一步,表名栈头不为空,且当前操作元素与栈头节点mode不一样,这时候可以唤醒栈头节点

                // 在这个条件下操作时,新加入的元素跟栈里面的元素mode肯定不一样(未加入前栈里元素mode都是一样的)
                // 理论上是肯定能匹配到的,但是有可能后续节点取消了等待,那么久从栈头一直往后遍历找,如果到最后都没找到
                // 这时候栈除了栈头,栈都空了,这时候将栈头也设置为空,重新走最外层的循环

                    if (h.isCancelled())            // 头结点取消了,那么将头结点的next节点设置为头结点
                        casHead(h, h.next);         // pop and retry
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode)))  // 这时候,将当前元素入栈到栈头
                        for (;;)  // loop until matched or waiters disappear
                            SNode m = s.next;       // 理论上 ,s.next就是s节点的匹配节点
                            if (m == null)         // 如果这时候m 为空,表名,栈里面出栈头,其他线程都取消等待了,没有元素了
                                casHead(s, null);   // pop fulfill node
                                s = null;           // use new node next time
                                break;              // restart main loop
                            
                            // 一直往栈里面找,只要是节点没有取消等待,肯定是能够找到的
                            SNode mn = m.next;
                            if (m.tryMatch(s))  // 匹配上,直接返回
                                casHead(s, mn);     // 这时候弹出的是栈头的两个元素,即栈头和栈头的下一个元素(匹配上),重置栈头,应该设置栈头为 h.next.next
                                return (E) ((mode == REQUEST) ? m.item : s.item);
                             else                  // 没有匹配上,继续往后续节点找,这时候s.next取消了等待,可以移除了
                                s.casNext(m, mn);   // 将s.next设置为 s.next.next,同时将s.next移除,这时候s.next可能是在并发操作的时候取消了等待
                        
                    
                 else  // 这种情况一般是上面条件在执行,之后另外一个线程新加入元素,这时候栈里面应该是有一对能够匹配上的节点
                    SNode m = h.next;               // m is h's match
                    if (m == null)                  // m节点取消了等待
                        casHead(h, null);           // pop fulfilling node
                    else 
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    
                
            
        

以上是关于java同步阻塞队列之SynchronousQueue的主要内容,如果未能解决你的问题,请参考以下文章

java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比

java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理

Java同步数据结构之ConcurrentLinkedQueue

Java同步数据结构之LinkedBlockingQueue

并发编程实践之公平有界阻塞队列实现

java阻塞队列 线程同步合作