源码角度了解阻塞队列之SynchronousQueue

Posted 周杰伦本人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码角度了解阻塞队列之SynchronousQueue相关的知识,希望对你有一定的参考价值。

源码角度了解阻塞队列之SynchronousQueue

SynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作

从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储

put()方法

public void put(E e) throws InterruptedException 
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) 
        Thread.interrupted();
        throw new InterruptedException();
    

我们可以看到它的put()方法没有什么逻辑,主要调用Transfererd的transfer()方法来放入元素

transfer()方法

Transfererd是一个抽象类,它的实现类有TransferQueue和TransferStack,分别是公平队列的实现和非公平队列的实现

我们先看一下TransferQueue的transfer()方法的实现逻辑

TransferQueue的transfer()方法

E transfer(E e, boolean timed, long nanos) 
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) 
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData)  // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null)                // lagging tail
                advanceTail(t, tn);
                continue;
            
            if (timed && nanos <= 0)        // cant wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);              // swing tail and wait
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s)                    // wait was cancelled
                clean(t, s);
                return null;
            

            if (!s.isOffList())            // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            
            return (x != null) ? (E)x : e;

         else                             // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e))          // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        
    

代码有点长,我们分析一下:

  1. isData代表是生产者模式还是消费者模式,如果传入的元素e是空的话显然是消费者模式
  2. for循环,初始化QNode的头结点和尾结点,如果为空的话说明没有初始化好,继续for循环进行初始化
  3. 如果头尾节点相同或者是尾部节点模式和传入的模式相同的话,添加到尾部
  4. 新建节点,添加到尾部,然后调用awaitFulfill()方法进入阻塞状态
  5. 当线程唤醒发现在等待队列中第一个的时候,返回元素
  6. 如果模式不相同的话,比如来的是消费者,而等待队列全是生产者,这时候与队列中的第一个元素进行配的,配对成功出队列,同时调用LockSupport.unpark(m.waiter)来唤醒生产者

TransferStack的transfer()方法

  1. 同样判断是否为同种模式,如果是同种模式,入栈 阻塞
  2. 如果不是同种模式,进行配对一起出栈

take()方法

SynchronousQueue的take()方法也是调用transferer的transfer()方法来获取元素,然后返回元素

    public E take() throws InterruptedException 
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    

总结

SynchronousQueue有公平队列和非公平队列两种实现方式,公平队列采用队列实现先进先出,非公平队列采用栈实现,先进后出,它的最大特点就是生产者来了需要阻塞,当对应的消费者来之后匹配成功才会被唤醒,这个SynchronousQueue产生阻塞,效率不高,一般不怎么使用

至此,我们的阻塞队列的实现基本介绍了,有ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue、DelayQueue和SynchronousQueue

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞

    以上是关于源码角度了解阻塞队列之SynchronousQueue的主要内容,如果未能解决你的问题,请参考以下文章

    JDK源码那些事儿之ConcurrentLinkedQueue

    JDK源码那些事儿之LinkedTransferQueue

    JDK源码那些事儿之ConcurrentLinkedDeque

    阻塞队列之ArrayBlockingQueue源码分析

    阻塞队列之ArrayBlockingQueue源码分析

    JAVA并发之阻塞队列浅析