并发工具源码解析
Posted fatmanhappycode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发工具源码解析相关的知识,希望对你有一定的参考价值。
Reentrantlock
Reentrantlock在AQS源码解析中已经捎带着解析过了,这里不再提及
CountDownLatch
CountDownLatch在AQS源码解析中也已经解析过了,这里同样不再提及
CyclicBarrier
CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
基本流程
属性
public class CyclicBarrier { // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代",或者"一个周期" private static class Generation { boolean broken = false; } private final ReentrantLock lock = new ReentrantLock(); // CyclicBarrier 是基于 Condition 的 // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上 private final Condition trip = lock.newCondition(); // 参与的线程数 private final int parties; // 如果设置了这个,代表越过栅栏之前,要执行相应的操作 private final Runnable barrierCommand; // 当前所处的“代” private Generation generation = new Generation(); // 还没有到栅栏的线程数,这个值初始为 parties,然后递减 // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量 private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
await ()
// 不带超时机制 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } // ==================================================================================================================================== // 带超时机制,如果超时抛出 TimeoutException 异常 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
doawait ()
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 先要获取到锁,然后在 finally 中要记得释放锁 // 如果记得 Condition 部分的话,我们知道 condition 的 await() 会释放锁,被 signal() 唤醒的时候需要重新获取锁 lock.lock(); try { final Generation g = generation; // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常 if (g.broken) throw new BrokenBarrierException(); // 检查中断状态,如果中断了,抛出 InterruptedException 异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // index 是这个 await 方法的返回值 // 注意到这里,这个是从 count 递减后得到的值 int index = --count; // 如果等于 0,说明所有的线程都到栅栏上了,准备通过 if (index == 0) { // tripped boolean ranAction = false; try { // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行 final Runnable command = barrierCommand; if (command != null) command.run(); // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况 ranAction = true; // 唤醒等待的线程,然后开启新的一代 nextGeneration(); return 0; } finally { if (!ranAction) // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏 // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out // 如果是最后一个线程调用 await,那么上面就返回了 // 下面的操作是给那些不是最后一个到达栅栏的线程执行的 for (;;) { try { // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断 if (g == generation && ! g.broken) { // 打破栅栏 breakBarrier(); // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法 throw ie; } else { // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成, // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可 // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常, // 而是之后抛出 BrokenBarrierException 异常 Thread.currentThread().interrupt(); } } // 唤醒后,检查栅栏是否是“破的” if (g.broken) throw new BrokenBarrierException(); // 这个 for 循环除了异常,就是要从这里退出了 // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代 // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的 // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作, // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回 // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码 if (g != generation) return index; // 如果醒来发现超时了,打破栅栏,抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
CyclicBarrier 与 CountDownLatch 区别
- CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
- CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。
Semaphore
acquire (int permits)
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();
// 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state,不进入if条件中,直接不阻塞继续执行; // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state,进入if条件中,执行doAcquireSharedInterruptibly将线程加入阻塞队列并挂起 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
tryAcquireShared(int arg)
tryAcquireShared在aqs没有提供实现,而在Semaphore中有公平和非公平两种实现
// 公平策略: protected int tryAcquireShared(int acquires) { for (;;) { // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
doAcquireSharedInterruptibly(arg)
这个方法和countDownLatch中调用的是同一个方法,是aqs提供的实现,大概就是直接入队后挂起
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
release ()
public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 这里的tryReleaseShared必定返回true if (tryReleaseShared(arg)) { // 这里和countDownLatch一样是aqs里的实现,会释放队列中除head外第一个线程 doReleaseShared(); return true; } return false; }
也就是说,每次release,不管释放的资源够不够用,它都会唤醒第一个线程去尝试获取一下资源,不行再重新把他挂起。
tryReleaseShared(int releases)
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; // 溢出,当然,我们一般也不会用这么大的数 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
doReleaseShared()
不多解释,看不懂说明你没理解aqs,建议回去看看
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { // 将 head 的 waitStatue 设置为 0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
之后唤起的线程会回到挂起的地方继续
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 这里的tryAcquireShared获取共享锁成功则返回剩余state值,为正数,设置state // 获取失败(state值不够acquire需要的),则返回负数(available-acquire),不设置state int r = tryAcquireShared(arg); // 这里大于0说明获取成功,那么应该顺便叫一下下一个线程 if (r >= 0) { // 这个aqs里讲过,这里不细讲,大概就是唤醒队列的下一个线程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
ReentrantReadWriteLock
读锁间不阻塞。写锁(独占锁)被占有时,其他线程阻塞。有写锁可以再获取读锁,叫锁降级。有读锁不可以再获取写锁。
ReadLock 使用了共享模式,WriteLock 使用了独占模式。
将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式。
非公平模式下如果碰上获取写锁的线程马上就要获取到锁了(head.next就是写锁),获取读锁的线程不应该和它抢。
以上是关于并发工具源码解析的主要内容,如果未能解决你的问题,请参考以下文章
JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段