并发工具源码解析

Posted fatmanhappycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发工具源码解析相关的知识,希望对你有一定的参考价值。

Reentrantlock

Reentrantlock在AQS源码解析中已经捎带着解析过了,这里不再提及

 


 

 

CountDownLatch

CountDownLatch在AQS源码解析中也已经解析过了,这里同样不再提及

 


 

CyclicBarrier

CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。

 

基本流程

技术图片

 

属性

public class CyclicBarrier {
    // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代",或者"一个周期"
    private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();

    // CyclicBarrier 是基于 Condition 的
    // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
    private final Condition trip = lock.newCondition();

    // 参与的线程数
    private final int parties;

    // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
    private final Runnable barrierCommand;

    // 当前所处的“代”
    private Generation generation = new Generation();

    // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
    // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
    private int count;

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

 

await () 

// 不带超时机制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

// ====================================================================================================================================

// 带超时机制,如果超时抛出 TimeoutException 异常
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

 

doawait ()

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
    final ReentrantLock lock = this.lock;
    // 先要获取到锁,然后在 finally 中要记得释放锁
    // 如果记得 Condition 部分的话,我们知道 condition 的 await() 会释放锁,被 signal() 唤醒的时候需要重新获取锁
    lock.lock();
    try {
        final Generation g = generation;
        // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
        if (g.broken)
            throw new BrokenBarrierException();
        // 检查中断状态,如果中断了,抛出 InterruptedException 异常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // index 是这个 await 方法的返回值
        // 注意到这里,这个是从 count 递减后得到的值
        int index = --count;

        // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
                ranAction = true;
                // 唤醒等待的线程,然后开启新的一代
                nextGeneration();
                return 0;
            } finally {
                if (!ranAction)
                    // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
                    // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // 如果是最后一个线程调用 await,那么上面就返回了
        // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
        for (;;) {
            try {
                // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
                if (g == generation && ! g.broken) {
                    // 打破栅栏
                    breakBarrier();
                    // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
                    throw ie;
                } else {
                    // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
                    // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可
                    // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
                    // 而是之后抛出 BrokenBarrierException 异常
                    Thread.currentThread().interrupt();
                }
            }

              // 唤醒后,检查栅栏是否是“破的”
            if (g.broken)
                throw new BrokenBarrierException();

            // 这个 for 循环除了异常,就是要从这里退出了
            // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
            // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
            // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
            // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
            // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
            if (g != generation)
                return index;

            // 如果醒来发现超时了,打破栅栏,抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

 

CyclicBarrier 与 CountDownLatch 区别

  • CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
  • CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

 


 

Semaphore

 

 acquire (int permits)

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
// 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state,不进入if条件中,直接不阻塞继续执行; // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state,进入if条件中,执行doAcquireSharedInterruptibly将线程加入阻塞队列并挂起 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }

 

tryAcquireShared(int arg)

tryAcquireShared在aqs没有提供实现,而在Semaphore中有公平和非公平两种实现

// 公平策略:
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

 

doAcquireSharedInterruptibly(arg)

这个方法和countDownLatch中调用的是同一个方法,是aqs提供的实现,大概就是直接入队后挂起

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state
                // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 


 

release ()

public void release() {
        sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
        // 这里的tryReleaseShared必定返回true
        if (tryReleaseShared(arg)) {
            // 这里和countDownLatch一样是aqs里的实现,会释放队列中除head外第一个线程
            doReleaseShared();
            return true;
        }
        return false;
}

也就是说,每次release,不管释放的资源够不够用,它都会唤醒第一个线程去尝试获取一下资源,不行再重新把他挂起。

 

tryReleaseShared(int releases)

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        // 溢出,当然,我们一般也不会用这么大的数
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

 

doReleaseShared()

不多解释,看不懂说明你没理解aqs,建议回去看看

// 调用这个方法的时候,state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                // 将 head 的 waitStatue 设置为 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                continue;                
        }
        if (h == head)                   
            break;
    }
}

之后唤起的线程会回到挂起的地方继续

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state
                // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state
                int r = tryAcquireShared(arg);
                // 这里大于0说明获取成功,那么应该顺便叫一下下一个线程
                if (r >= 0) {
                    // 这个aqs里讲过,这里不细讲,大概就是唤醒队列的下一个线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 


 

ReentrantReadWriteLock

 

 参考

 

读锁间不阻塞。写锁(独占锁)被占有时,其他线程阻塞。有写锁可以再获取读锁,叫锁降级。有读锁不可以再获取写锁。

ReadLock 使用了共享模式,WriteLock 使用了独占模式。

将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。

非公平模式下如果碰上获取写锁的线程马上就要获取到锁了(head.next就是写锁),获取读锁的线程不应该和它抢。

 


 

技术图片

 

 

以上是关于并发工具源码解析的主要内容,如果未能解决你的问题,请参考以下文章

java并发编程CountDownLatch类源码解析

Java并发编程-CountDownLatch

Java并发编程-CountDownLatch

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段

Java并发编程-Semaphore

Java并发编程-Semaphore