数据结构 - SynchronousQueue 线程通信阻塞队列

Posted yuanjiangnan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据结构 - SynchronousQueue 线程通信阻塞队列相关的知识,希望对你有一定的参考价值。

技术图片

简介

SynchronousQueue 没有长度,每一个入队操作必须对应一个出队操作,或者每一个出队操作必须对应一个入栈操作,否则阻塞。SynchronousQueue内部提供两种模式TransferStack非公平模式(LIFO)和TransferQueue公平模式(FIFO)。

SynchronousQueue 类

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue 继承AbstractQueue抽象类,并实现BlockingQueue接口

重要内部类Transferer

abstract static class Transferer<E>

Transferer是抽象类,它有两个实现TransferStack、TransferQueue

Transferer 方法

abstract E transfer(E e, boolean timed, long nanos);

此方法可以既可以执行put也可以执行take操作。

重要内部类TransferStack

static final class TransferStack<E> extends Transferer<E>

TransferStack 内部类SNode

static final class SNode

TransferStack.SNode 属性

// 后面节点
volatile SNode next;
// 匹配节点
volatile SNode match;
// 等待线程
volatile Thread waiter;
// 元素
Object item;
// 节点模式
int mode;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// 匹配节点偏移量
private static final long matchOffset;
// 后续节点偏移量
private static final long nextOffset;

TransferStack.SNode 静态加载偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = SNode.class;
        matchOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("match"));
        nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferStack.SNode 构造函数

SNode(Object item) {
    this.item = item;
}

TransferStack.SNode 方法

// 修改后续节点
boolean casNext(SNode cmp, SNode val) {
    return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 尝试匹配
boolean tryMatch(SNode s) {
    // 修改匹配节点    
    if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        // 唤醒等待线程
        Thread w = waiter;
        if (w != null) {
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;
}
// 尝试取消等待
void tryCancel() {
    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// 是否匹配
boolean isCancelled() {
    return match == this;
} 

TransferStack 属性

// 消费者
static final int REQUEST    = 0;
// 生产者
static final int DATA       = 1;
// 生产者在等待消费者消费
static final int FULFILLING = 2;
// 头节点
volatile SNode head;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// head偏移量
private static final long headOffset;

TransferStack 静态加载偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = TransferStack.class;
        headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferStack 基础方法

// 更新头节点
boolean casHead(SNode h, SNode nh) {
    // 把头节点从h更新为nh
    return h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
// 判断是否为FULFILLING模式
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 设置节点属性,节点为空创建新节点
static SNode snode(SNode s, Object e, SNode next, int mode) {
    // s 为空,创建新节点
    if (s == null) s = new SNode(e);
    // 设置s属性
    s.mode = mode;
    s.next = next;
    return s;
}
// 如果节点在栈头或栈头为FULFILLING的节点,则返回true
boolean shouldSpin(SNode s) {
    SNode h = head;
    return (h == s || h == null || isFulfilling(h.mode));
}

TransferStack 重要方法

入队出队
E transfer(E e, boolean timed, long nanos) {
    // 根据元素判断模式
    SNode s = null;
    int mode = (e == null) ? REQUEST : DATA;
    // 自旋
    for (;;) {
        SNode h = head;
        // 头节点模式与当前模式一样
        if (h == null || h.mode == mode) {
            // 如果超时,则取消等待
            if (timed && nanos <= 0) {
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            }
            // 没有超时,入栈,head指向他
            else if (casHead(h, s = snode(s, e, h, mode))) {
                // 自旋等待匹配
                SNode m = awaitFulfill(s, timed, nanos);
                // 取消等待
                if (m == s) {
                    // 清理取消等待的节点
                    clean(s);
                    return null;
                }
                // 头节点匹配成功,头后移
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);
                // REQUEST返回匹配元素,DATA返回本身
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        }
        // 头节点不为Fulfilling模式
        else if (!isFulfilling(h.mode)) {
            // 头节点是否取消等待
            if (h.isCancelled())
                // 头节点后移
                casHead(h, h.next);
            // 入栈修改头节点
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 自旋
                for (;;) {
                    SNode m = s.next;
                    // next节点为null,则出栈
                    if (m == null) {
                        casHead(s, null);
                        s = null;
                        break;
                    }
                    SNode mn = m.next;
                    // 尝试匹配是s节点
                    if (m.tryMatch(s)) {
                        // s出栈
                        casHead(s, mn);
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else
                        // 节点后移
                        s.casNext(m, mn);
                }
            }
        }
        // 头节点为Fulfilling模式
        else {
            SNode m = h.next;
            // 头节点next为空,修改头节点
            if (m == null)
                casHead(h, null);
            else {
                SNode mn = m.next;
                // 试匹配,如果匹配成功,
                // 栈头和匹配节点出栈,否则跳过后继节点 
                if (m.tryMatch(h))
                    casHead(h, mn);
                else
                    h.casNext(m, mn);
            }
        }
    }
}

自旋或阻塞

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    // 获取剩余等待时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 获取自旋次数
    int spins = (shouldSpin(s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    // 自旋
    for (;;) {
        // 响应中断
        if (w.isInterrupted())
            s.tryCancel();
        // 获取匹配节点
        SNode m = s.match;
        if (m != null)
            return m;
        // 是否需要等待
        if (timed) {
            nanos = deadline - System.nanoTime();
            // 等待时间小于0,出队
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        // 自旋次数减一
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        // 数组等待线程
        else if (s.waiter == null)
            s.waiter = w;
        // 等待
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

移除队列中取消等待的线程节点

void clean(SNode s) {
    // 置空元素和等待线程
    s.item = null;
    s.waiter = null;
    SNode past = s.next;
    // past已取消节点后移
    if (past != null && past.isCancelled())
        past = past.next;
    // 循环修改头节点
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        // 设置栈头节点的next为第一个非取消等待的节点
        casHead(p, p.next);
    // 遍历栈
    while (p != null && p != past) {
        SNode n = p.next;
        // 移除取消等待的节点
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            // 节点后移
            p = n;
    }
}

这里说一下大概逻辑,TransferStack在执行读写时,首先判断元素是否为空,为空REQUEST模式,否则DATA模式。队列为空或当前模式与队头模式一样,自旋阻塞;队列不为空且与队头的模式不同,匹配成功,出队操作;队列不为空且队头为FULFILLING模式,从队头往后遍历找第一个非FULFILLING模式匹配,匹配成功出队。

重要内部类TransferQueue

static final class TransferQueue<E> extends Transferer<E>

TransferQueue 内部类QNode

static final class QNode

TransferQueue.QNode 属性

// 下一个节点
volatile QNode next;
// 节点元素
volatile Object item;
// 等待线程
volatile Thread waiter;
// 是否为DATA模式
final boolean isData;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// item偏移量
private static final long itemOffset;
// next偏移量
private static final long nextOffset;

TransferQueue.QNode 加载获取偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = QNode.class;
        itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
        nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferQueue.QNode 构造函数

QNode(Object item, boolean isData) {
    this.item = item;
    this.isData = isData;
}

TransferQueue.QNode 方法

// CAS设置next属性
boolean casNext(QNode cmp, QNode val) {
    return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS设置元素值
boolean casItem(Object cmp, Object val) {
    return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 取消节点等待(方便GC)
void tryCancel(Object cmp) {
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}   
// 是否取消等待
boolean isCancelled() {
    return item == this;
}
// 是否已出队
boolean isOffList() {
    return next == this;
}

TransferQueue 属性

// 头节点
transient volatile QNode head;
// 尾节点
transient volatile QNode tail;
// 待取消节点
transient volatile QNode cleanMe;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// 头节点偏移量
private static final long headOffset;
// 尾节点偏移量
private static final long tailOffset;
// 待取消节点偏移量
private static final long cleanMeOffset;

TransferQueue 加载获取偏移量

static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = TransferQueue.class;
        headOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("head"));
        tailOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("tail"));
        cleanMeOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("cleanMe"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

TransferQueue 构造函数

TransferQueue() {
    // 初始化一个空的QNode
    // isData为false
    QNode h = new QNode(null, false);
    // 设置头节点和尾节点
    head = h;
    tail = h;
}

TransferQueue 基础方法

// 尝试修改新头节点
void advanceHead(QNode h, QNode nh) {
    if (h == head &&
            UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
        h.next = h; // forget old next
}
// 尝试修改新尾节点
void advanceTail(QNode t, QNode nt) {
    if (tail == t)
        UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 尝试修改新取消等待节点
boolean casCleanMe(QNode cmp, QNode val) {
    return cleanMe == cmp &&
            UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}

TransferQueue 重要方法

入队出队

E transfer(E e, boolean timed, long nanos) {
    QNode s = null;
    // e不为null,则为DATA模式,否则为REQUEST模式
    boolean isData = (e != null);
    for (;;) {
        // 获取当前头尾节点
        QNode t = tail;
        QNode h = head;
        // 头或尾节点为空
        if (t == null || h == null)
            continue;
        // 头尾节点一样,模式一样
        if (h == t || t.isData == isData) {
            QNode tn = t.next;
            // 尾节点已变化
            if (t != tail)
                continue;
            // 尾节点next不为空
            if (tn != null) {
                // 尝试修改尾节点
                advanceTail(t, tn);
                continue;
            }
            // 超时,并且剩余时间小于0
            if (timed && nanos <= 0)
                // 返回null
                return null;
            // 新节点为空,初始化新节点
            if (s == null)
                s = new QNode(e, isData);
            // 尾节点的next为null,就把新节点加到后面
            if (!t.casNext(null, s))
                // 替换失败,开始下轮自旋
                continue;
            // 尝试修改尾节点
            advanceTail(t, s);
            // 阻塞等待
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果s指向自己,s出队列
            if (x == s) {
                // 清除队列中取消等待的线程节点
                clean(t, s);
                return null;
            }
            // 是否已出队
            if (!s.isOffList()) {
                // 修改头节点
                advanceHead(t, s);
                // 元素指向自己
                if (x != null)
                    s.item = s;
                // 等待取消
                s.waiter = null;
            }
            // 返回元素
            return (x != null) ? (E)x : e;
        }
        // 头尾节点不一样
        else {
            QNode m = h.next;
            // 头尾节点变化
            if (t != tail || m == null || h != head)
                continue;
            // 获取当前元素
            Object x = m.item;
            // 后续节点模式跟当前模式一样
            // 或者已经尝试取消
            // 或者修改后续节点元素为当前元素(交换元素)
            if (isData == (x != null) || x == m ||
                    !m.casItem(x, e)) {
                // 出队
                advanceHead(h, m);
                // 进入下次自旋,修改对方线程
                continue;
            }
            // 直接修改头节点
            advanceHead(h, m);
            // 解除后续节点阻塞
            LockSupport.unpark(m.waiter);
            // 返回后续节点元素
            return (x != null) ? (E)x : e;
        }
    }
}

自旋阻塞

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    // 计算超时时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋次数
    int spins = ((head.next == s) ?
            (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        // 如果中断,则取消等待
        if (w.isInterrupted())
            // 把s的item从e修改为s
            s.tryCancel(e);
        // 获取s的元素
        Object x = s.item;
        // s的item不为e,直接返回x
        if (x != e)
            return x;
        // 超时
        if (timed) {
            // 计算剩余超时时间
            nanos = deadline - System.nanoTime();
            // 超时时间小于等于0,取消节点等待
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        // 自旋次数减一
        if (spins > 0)
            --spins;
        // 等待线程不为空
        else if (s.waiter == null)
            // 设置等待线程为当前线程
            s.waiter = w;
        // 没有超时
        else if (!timed)
            // 开始阻塞
            LockSupport.park(this);
        // 超时时间大于1000
        else if (nanos > spinForTimeoutThreshold)
            // 阻塞
            LockSupport.parkNanos(this, nanos);
    }
}

移除队列中取消等待的线程节点

void clean(QNode pred, QNode s) {
    s.waiter = null;
    // 遍历
    while (pred.next == s) {
        // 获取头节点和头节点next
        QNode h = head;
        QNode hn = h.next;
        // 头节点next不为空,并且是取消等待节点
        if (hn != null && hn.isCancelled()) {
            // 修改头节点
            advanceHead(h, hn);
            continue;
        }
        // 获取尾节点
        QNode t = tail;
        // 头尾一样时返回
        if (t == h)
            return;
        // 尾节点有next
        QNode tn = t.next;
        // 非一致性读
        if (t != tail)
            continue;
        // 尾节点next不为空,修改尾节点
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        // s不是尾节点
        if (s != t) {
            // 修改pred下级节点
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))
                return;
        }
        // 获取取消节点
        QNode dp = cleanMe;
        // 取消节点不为空
        if (dp != null) {
            // 获取取消节点next
            QNode d = dp.next;
            QNode dn;
            // 移除前一个取消等待的节点
            if (d == null ||
                    d == dp ||
                    !d.isCancelled() ||
                    (d != t &&
                            (dn = d.next) != null &&
                            dn != d &&
                            dp.casNext(d, dn)))
                casCleanMe(dp, null);
            if (dp == pred)
                return;
        }
        // 设置取消节点
        else if (casCleanMe(null, pred))
            return;
    }
}

这里说一下大概逻辑,TransferQueue在执行读写时,首先判断元素是否为空,为空REQUEST模式,否则DATA模式。队列为空或当前模式与队尾模式一样,自旋阻塞;队列不为空且与队头的模式不同,匹配成功,出队操作。

SynchronousQueue 属性

// CPU的数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 没有超时的情况自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 剩余时间阈值常量(没有超时时间会用到)
static final long spinForTimeoutThreshold = 1000L;
// Transferer模式
private transient volatile Transferer<E> transferer;
// 下面三个都是序列化时使用
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;

SynchronousQueue 构造函数

// 默认初始化
public SynchronousQueue() {
    // 默认非公平模式
    this(false);
}
// 设置是否使用公平模式
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue 基础方法

public boolean isEmpty() {
    return true;
}
public int size() {
    return 0;
}
public int remainingCapacity() {
    return 0;
}
public void clear() {
    
}
public boolean contains(Object o) {
    return false;
}
public boolean remove(Object o) {
    return false;
}
public boolean containsAll(Collection<?> c) {
    return c.isEmpty();
}
public boolean removeAll(Collection<?> c) {
    return false;
}
public boolean retainAll(Collection<?> c) {
    return false;
}
public E peek() {
    return null;
}

可以说SynchronousQueue是没有容量的(只有生成者线程或者消费者线程),所以长度可以看做0,也不能peek。

SynchronousQueue 入队出队

// 入队,没有出队操作一直阻塞
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // 返回为null,则put失败,中断当前线程
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}
// 入队,超时阻塞
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    // 返回为null,则offer失败,中断当前线程
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}
// 入队
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 入队
    return transferer.transfer(e, true, 0) != null;
}
// 出队,没有入队操作一直阻塞
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}
// 出队,超时阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 出队,获取返回值
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    // 返回值不为空,或者线程未中断,返回结果
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}
// 出队
public E poll() {
    // 出队
    return transferer.transfer(null, true, 0);
}

技术图片

以上是关于数据结构 - SynchronousQueue 线程通信阻塞队列的主要内容,如果未能解决你的问题,请参考以下文章

SynchronousQueue

SynchronousQueue 源码解析

SynchronousQueue 1.8 源码解析

Java中的阻塞队列-SynchronousQueue

java同步阻塞队列之SynchronousQueue

并发编程—— Java 并发队列 BlockingQueue 实现之 SynchronousQueue源码分析