Phaser实现源代码剖析
Posted lytwajue
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Phaser实现源代码剖析相关的知识,希望对你有一定的参考价值。
一般运用的场景是一组线程希望同一时候到达某个运行点后(先到达的会被堵塞),运行一个指定任务,然后这些线程才被唤醒继续运行其他任务。
Phaser通常是定义一个parties数(parties一般代指须要进行同步的线程)。当这些parties到达某个运行点,就会调用await方法。表示到达某个阶段(phase),然后就会堵塞直到有足够的parites数(也就是线程数)都调用过await方法后,这些线程才会被逐个唤醒,另外。在唤醒之前,能够选择性地运行某个定制的任务。
Phaser对照起CyclicBarrier。不仅它是能够反复同步,而且parties数是能够动态注冊的,另外还提供了非堵塞的arrive方法表示先到达阶段等,大大提高了同步模型的灵活性,当然了,实现也会相对复杂。
注冊(Registration)
和其他栅栏不一样,在phaser上进行同步的注冊parites数可能会随着时间改变而不同。能够在不论什么时间注冊任务(调用register,bulkRegister方法,或者能够初始化parties数的构造函数形式),另外还能够在不论什么到达栅栏的时候反注冊(arriveAndDeregister方法)。作为大部分同步结构的基础。注冊和反注冊仅仅会影响内部计数;它们没有做不论什么额外的内部记录,所以任务不能够查询它们是否被注冊。(只是你能够继承这个类以引入这种记录)
同步(Synchronization)
就像CyclicBarrier一样,Phaser能够反复地等待。arriveAndAwaitAdvance方法与CyclicBarrier.await作用类似。每一代(generation)的phaser有一个关联的阶段号(phase number)。
阶段号从0開始。当全部parties都到达阶段的时候就会加一。直到Integer.MAX_VALUE后返回0。阶段号的使用同意在到达某个阶段以及在等待其他parites的时候,通过下面两类能够被不论什么注冊过的party调用的方法。运行单独的操作:
1.到达(Arrival)
arrive方法和arriveAndDeregister方法记录到达。
这些方法不会堵塞。但会返回一个关联的到达阶段号(arrival phase number)。也就是,phaser在到达以后所用的阶段号。
当最后的party到达一个给定的阶段,就能够运行可选的操作,而且阶段号自加一。
这些操作都会被触发阶段添加的party运行。而且会被可重写方法onAdvance(同一时候管理Phaser的终结)安排管理。重写onAdvance方法比起CyclicBarrier提供的栅栏操作非常相似。但更加灵活。
2.等待(Waiting)
awaitAdvance方法须要一个表示到达阶段号的參数。并在phaser添加(或者已经在)不同的阶段的时候返回。与CyclicBarrier类似结构不同。awaitAdvance方法会在等待线程被中断的时候继续等待。
当然也有可中断和超时版本号。可是当任务等待发生中断或者超时遇到的异常也不会改变phaser的状态。假设必要,你能够在这些异常的处理器里运行关联的恢复操作,通常是在调用forceTermination之后恢复。Phaser可能也会被运行在ForkJoinPool中的任务使用,这样当其他线程在等待phaser添加被堵塞的时候,就能够确保有效平行地运行任务。
终结(Termination)
phaser可能进入一个终结状态。能够通过isTerminated来检查。当终结的时候。全部的同步方法都不会在等待下一个阶段而直接返回。返回一个负值来表示该状态。类似地,在终结的时候尝试注冊没有不论什么效果。
当onAdvance调用返回true的时候就会触发终结。
onAdvance默认实现为当一个反注冊导致注冊parties数降为0的时候返回true。当phser要控制操作在一个固定得迭代次数时。就能够非常方便地重写这种方法,当当前阶段号到达阀值得时候就返回true导致终结。forceTermination方法也时还有一个能够突然释放等待线程而且同意它们终结。
堆叠(Tiering)
Phaser能够被堆叠在一起(也就是说,以树形结构构造)来减少竞争。Phaser的parties数非常大的时候。以一组子phasers共享一个公共父亲能够减轻严重的同步竞争的成本。这样做能够大大提供吞吐量,但同一时候也会导致每一个操作的更高的成本。
在一棵堆叠的phaser树中,子phaser在父亲上的注冊和反注冊都会被自己主动管理。当子phaser的注冊parties树为非0的时候。子phaser就会注冊到父亲上。
当因为arriveAndDeregister的调用使注冊的parties数变为0时,子phaser就会从父亲中反注冊。这样就算父phaser的全部parties都到达了阶段,也必须等待子phaser的全部parties都到达了阶段并显式调用父phaser的awaitAdvance才算到达新的阶段。反之亦然。这样父phaser或者子phaser里注冊过的全部parties就能够一起互相等待到新的阶段。
另外,在这个堆叠结构的实现里,能够确保root结点必定是最先更新阶段号。然后才到其子结点,逐渐传递下去。
+------+ +------+ +------+ | root | <-- |parent| <-- | this | +------+ +------+ +------+ parties:3+1 parties:3+1 parties:3如上图所看到的,假设parties数多的时候,能够依据堆叠成为一颗树。这里假设root和parent和this都各初始化3个parties数。然后假设当前结点this有注冊parties数,则会在parent上注冊一个parties,因此其实root和parent都注冊了4个parties数。这样,假设this结点的3个parties数都到达了。就会调用parent的arrive,把parties数减去一,然后parent等待自己3个parties数都到达,就会调用root来减去一,这样root的3个parties数都到达就会一同释放全部等待结点,就实现了整棵树parties之间同步等待的功能。另外这个结构也非常easy看到root结点是最快进行阶段增长的。
这样做最大的优点就是降低对同一个state变量的CAS竞争带来的性能下降。只是同一时候每一个同步操作也会添加对应的负担(每次获取状态都要和root进行阶段同步),所以一般在高并发下造成的性能下降才考虑。
监控(Monitoring)
同步方法仅仅能被注冊的parties调用时,phaser的当前状态能够被不论什么调用者监控。在不论什么时刻。有getRegisteredParties总共的parties。当中,有getArrivedParties个parites到达getPhase的当前阶段。当剩下getUnarrivedParties个parties到达。phase添加。这些方法的返回值可能反映短暂的状态,因此一般在同步控制中不太实用。toString方法以一种能够方便信息监控的格式返回这些状态的快照。
Phaser的详细使用方法能够參考样例:这里源代码剖析
内部状态表示
Phaser内部对于状态(包含parties注冊数、阶段号、未到达的parties数等)管理都使用一个volatile long型变量,同一时候利用CAS进行更新。这样做就能够保证在状态改变时。保持全部状态一致改变,这是实现无锁算法的基础之中的一个。
private volatile long state; private static final int MAX_PARTIES = 0xffff; private static final int MAX_PHASE = Integer.MAX_VALUE; private static final int PARTIES_SHIFT = 16; private static final int PHASE_SHIFT = 32; private static final int UNARRIVED_MASK = 0xffff; // to mask ints private static final long PARTIES_MASK = 0xffff0000L; // to mask longs private static final long COUNTS_MASK = 0xffffffffL; private static final long TERMINATION_BIT = 1L << 63; // some special values private static final int ONE_ARRIVAL = 1; private static final int ONE_PARTY = 1 << PARTIES_SHIFT; private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; private static final int EMPTY = 1; //内部状态辅助方法 private static int unarrivedOf(long s) { int counts = (int)s; return (counts == EMPTY) ?state变量为long类型,长度为64位。当中:0 : (counts & UNARRIVED_MASK); } private static int partiesOf(long s) { return (int)s >>> PARTIES_SHIFT; } private static int phaseOf(long s) { return (int)(s >>> PHASE_SHIFT); } private static int arrivedOf(long s) { int counts = (int)s; return (counts == EMPTY) ? 0 : (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); }
未达到parties数 (0-15位)
注冊的parties数 (16-31位)
当前阶段号 (32-62位)
结束状态标识 (63位 符号位)
把符号位设置位结束状态能够简单推断state是否为负表示是否结束。另外。假设当phaser没有不论什么注冊parties数,则会用一个无效状态EMPTY(0个已注冊和1个未到达parites数)来区分其他状态。
除此之外,phaser定义了一些静态常量方便对state变量进行移位解析,如*_SHIFT移位和*_MASK掩位。另外另一些特殊值方便计算。另一些辅助方法可以从state提取某些状态值。
动态注冊
接着看看动态注冊parties的实现。动态注冊有两个可用的接口。register方法和bulkRegister方法。当中register方法默认注冊一个party数,bulkRegister方法能够注冊数加上已经注冊的,最大不超过MAX_PARTIES。
public int register() { return doRegister(1); } public int bulkRegister(int parties) { if (parties < 0) throw new IllegalArgumentException(); if (parties == 0) return getPhase(); return doRegister(parties); }两者实现都非常easy,bulkRegister方法中加入了对parties数的检查。两个方法都调用了doRegister方法实现。
private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = (parent == null) ?doRegister方法做了下面事情:state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) break; if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } else if (parent == null) { // 1st root registration long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent.doRegister(1); if (phase < 0) break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same "transaction". while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; }
首先计算注冊后要当前state要调整的值adjust,注意adjust把未到达的parties数和注冊的parties数都设为registrations;接着就进入自循环,首先考虑到堆叠的情况(parent不为null),就要调用reconcileState方法与parent的阶段号同步,并计算出未注冊前正确的state值。然后再依次计算注冊parties数parties,未到达数unarrived。阶段号phase,而且推断注冊后是否超过MAX_PARTIES等一系列的准备工作,接下来就是三个推断区分三种不同的情况:
(1)假设counts!=EMPTY(也即已经有parties注冊),则此时假设parent不为null,则要调用reconcileState方法和当前的状态s对照,这是由于要保证parent和子phaser的阶段号保持一致。假设一致或者parent为null(没有堆叠)。此时能够先拿unarrived值与0推断。假设为0。则表示全部parties已经到达。阶段号添加的情况。因此调用root.internalAwaitAdvance(在没有堆叠的情况下,root为this。假设发生堆叠。root则为整棵树的根节点)来等待添加完毕并再次循环同步,假设unarrived非0,则直接利用CAS把当前state加入adjust。
假设CAS成功就能够break推出循环同步。
其实,这里的unarrived推断相当重要。假设这样的情况,假设此时仅仅剩下最后一个未到达的parties数,而它刚好调用了arrive到达阶段,由于考虑到最后一个到达的party必须运行onAdvance函数,假设此时有新的注冊party,则要又一次等待party完毕,但已经造成错误的onAdvance调用,因此必需要在最后的party到达的时候禁止注冊。到达函数doArrive中有两次CAS的调用(具体实如今以下具体说明),第一次会把当前的未到达状态数变为0(这个CAS同一时候也是表示最后一个party已经到达)。第二次在调用onAdvance后。会又一次设置新的状态值。在doRegister函数里。unarrived的推断能够防止第一个CAS以后(即最后一个party到达)时会运行CAS来注冊新的party(由于此时unarrived==0。doRegister会进入堵塞等待doArrive完毕第二个CAS)。
(2)假设(1)推断失败。而且parent==null时。则表示这次是第一个注冊,因此直接算出新的state值,而且CAS就可以,注意这里没有unarrived推断的优化。由于这次是第一个注冊,所以不会出现全部parties已经到达。阶段号添加的情况。
(3)假设(1)和(2)推断失败。则表示这次是子phaser的第一次注冊。这时我们使用来synchronized的内置锁来防止并发出现。这时由于子phaser第一次注冊。堆叠结构就必需要向parent注冊一次并仅仅有一次。
获取synchronized锁后,再又一次检查一次状态是否发生变化,然后就调用parent.doRegister(1)向父phaser注冊自己。然后假设父phaser注冊成功(返回的phase>=0),就要利用自旋CAS把当前状态加入adjust,注意这个自旋就是强制当前状态值必需要成功注冊,这是由于这个和父phaser注冊都属于同一个原子事务,要在锁里面完毕。否则可能会状态不一致。
doRegister大致逻辑如上。接着我们来看看当中的一些方法实现。首先是reconcileState。
private long reconcileState() { final Phaser root = this.root; long s = state; if (root != this) { int phase, p; // CAS to root phase with current parties, tripping unarrived while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) && !UNSAFE.compareAndSwapLong (this, stateOffset, s, s = (((long)phase << PHASE_SHIFT) | ((phase < 0) ?reconcileState主要目的是和根结点保持阶段号同步。前面说过。假设出现堆叠情况,根结点是最先进行阶段号添加,尽管阶段号添加的操作会逐渐传递到子phaser,但某些同步操作,如动态注冊等,须要立即获悉整棵树的阶段号状态避免多余的CAS,因此就须要显式和根结点保持同步。reconcileState实现就是如此,假设root!=this。即发生堆叠。就利用自旋CAS把当前改动状态值,要注意的是因为阶段号添加。会同一时候会把未到达的parties数设置为原来的注冊parties数。主要实现都是移位和掩位操作,就不再赘述。(s & COUNTS_MASK) : (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : ((s & PARTIES_MASK) | p)))))) s = state; } return s; }
awaitAdvance实现
Phaser的一个重要的同步操作就是awaitAdvance系列方法。awaitAdvance是堵塞等待指定阶段号增长的一系列方法。包含
int awaitAdvance(int phase) int awaitAdvanceInterruptibly(int phase) throws InterruptedException int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException这三个方法的实现事实上都大同小异,主要是添加来对中断和超时的控制。详细实现例如以下:
public int awaitAdvance(int phase) { final Phaser root = this.root; long s = (root == this) ? state : reconcileState(); int p = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; if (p == phase) return root.internalAwaitAdvance(phase, null); return p; } public int awaitAdvanceInterruptibly(int phase) throws InterruptedException { //省略一样的代码 if (p == phase) { QNode node = new QNode(this, phase, true, false, 0L); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); } return p; } public int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { long nanos = unit.toNanos(timeout); //省略一样的代码 if (p == phase) { QNode node = new QNode(this, phase, true, true, nanos); p = root.internalAwaitAdvance(phase, node); if (node.wasInterrupted) throw new InterruptedException(); else if (p == phase) throw new TimeoutException(); } return p; }三者实现大致结构都一样。首先获取当前状态值。假设堆叠则调用reconcileState获取根结点同步后的状态值。然后假设当前阶段号与请求等待的阶段号相等,则调用根结点的internalAwaitAdvance方法(根结点是最先进行阶段号增长)。
internalAwaitAdvance有两个參数,一个是指定等待的阶段号,另外一个是等待结点QNode,假设这个參数为null则会在内部创建一个不会被中断也不会超时的结点来增加队列进行等待,否则就会把參数结点增加队列,因此能够看到awaitAdvance的中断和超时版本号都会自己创建相应的结点传入。先来看看结点QNode的实现:
static final class QNode implements ForkJoinPool.ManagedBlocker { //省略其他成员变量以及构造函数 QNode next; public boolean isReleasable() { if (thread == null) return true; if (phaser.getPhase() != phase) { thread = null; return true; } if (Thread.interrupted()) wasInterrupted = true; if (wasInterrupted && interruptible) { thread = null; return true; } if (timed) { if (nanos > 0L) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } if (nanos <= 0L) { thread = null; return true; } } return false; } public boolean block() { if (isReleasable()) return true; else if (!timed) LockSupport.park(this); else if (nanos > 0) LockSupport.parkNanos(this, nanos); return isReleasable(); } }Phaser的等待队列使用的是Treiber无锁算法的栈操作。
样例实现能够參考这里。
首先能够注意到QNode类是实现了ForkJoinPool.ManagedBlocker接口,这个接口能够确保假设使用ForkJoinWorkerThread的时候就能够保持并发运行任务。
首先看看isReleaseable的实现。接口定义函数返回true,就不须要接下来的block操作。因此假设当前阶段号已经和指定阶段号不相等,则返回true。另外在推断中断的时候。假设interruptible值(构造函数的时候)为false。则会忽略中断。接着就是一个典型的超时推断逻辑。注意这里在返回true之前都会把thread设为null,表示不须要等待取消,不须要进行唤醒。
block的实现也是非常easy,假设非超时就调用LockSupport.part。否则就调用超时版本号parkNanos。
接下来看看internalAwaitAdvance实现。
//NCPU是当前CPU数量 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ?函数做了下面事情:1 : 1 << 8; private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase-1); // ensure old queue clean boolean queued = false; // true when node is enqueued int lastUnarrived = 0; // to increase spins upon change int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int)s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // done or aborted break; else if (!queued) { // push onto queue AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
1、调用releaseWaiters释放上一个阶段的等待队列;
2、进入while循环。推断条件是保证指定的阶段和当前阶段保持一致,然后有4个推断分支。代表不同情况:
(1)假设參数node为null,即请求的是非堵塞超时等待。接着是一个有关自旋等待的逻辑,考虑到在多核CPU上短时间大量线程唤醒相当慢。因此在这里准备堵塞先加入一个简单的自旋逻辑。详细是这样:初始化一个自旋次数spins为SPINS_PER_ARRIVAL。当此前的未到达parties数unarrived和上次记录的未到达parties数lastUnarrived不相等的时候,而且parites数少于当前CPU数量,则会给当前自旋次数加入一个SPINS_PER_ARRIVAL常量。这样在下一次到达之前都会自旋spins次。假设此时出现阶段号增长,则会退出自旋,就能够避免接下来的堵塞逻辑。但假设在自旋spins次阶段号仍然没有递增(假设此时发生中断则取消自旋进入堵塞),则创建一个非中断超时结点。准备进入等待队列。
(2)假设(1)推断失败,则调用已经创建node的isReleaseable方法,推断是否是否由于中断或者超时等取消当前等待。
(3)这里是一个入队操作,用queued变量保证仅仅入队一次。另外。考虑到上一阶段里假设有结点在释放的时候。刚好当前阶段有入队操作的话会有竞争产生,因此这里採用了两个队列。偶数队列(evenQ)和奇数队列(oddQ)。接着依照Treiber算法的模型,把请求结点入队,注意入队前须要再次推断当前阶段是否已经添加。
(4)最后就是进入堵塞状态,这里调用ForkJoinPool.managedBlock方法把结点堵塞,直到阶段号增长被唤醒,或者发生中断或超时等取消等待。
3、while循环退出后,接着就是结点node状态清理。包含清空thread引用,以及必要的中断状态复位。另外假设在中断和超时的情况下还须要调用abortWait释放队列里面相同是中断和超时的结点。
4、最后再次调用releaseWaiters释放当前阶段号的等待队列。
这里顺便看看releaseWaiters和abortWait的实现。
因为这两者都非常相似,因此一并剖析。
private void releaseWaiters(int phase) { QNode q; // first element of queue Thread t; // its thread AtomicReference<QNode> head = (phase & 1) == 0 ?releaseWaiters方法主要利用自旋从head结点起把队列里的结点出队,假设结点的thread引用为非null。则顺便唤醒。evenQ : oddQ; while ((q = head.get()) != null && q.phase != (int)(root.state >>> PHASE_SHIFT)) { if (head.compareAndSet(q, q.next) && (t = q.thread) != null) { q.thread = null; LockSupport.unpark(t); } } } private int abortWait(int phase) { AtomicReference<QNode> head = (phase & 1) == 0 ?
evenQ : oddQ; for (;;) { Thread t; QNode q = head.get(); int p = (int)(root.state >>> PHASE_SHIFT); if (q == null || ((t = q.thread) != null && q.phase == p)) return p; if (head.compareAndSet(q, q.next) && t != null) { q.thread = null; LockSupport.unpark(t); } } }
另外注意的是。每次出队前都会推断当前结点的阶段号是否与状态的阶段号相等,这里的状态阶段号用的是root.state,这是考虑到堆叠的情况。
abortWait的实现是releaseWaiters的变种,从头结点開始。假设遇到没有被取消等待(thread引用是否为null)而且阶段号与当前相等的正常堵塞结点,就会退出,否则一直释放结点。
arrive实现
Phaser提供了单独表示到达阶段的非堵塞函数,即
int arrive() //一个party到达 int arriveAndDeregister() //一个party到达而且反注冊这个party这两个函数的实现都非常easy:
public int arrive() { return doArrive(ONE_ARRIVAL); } public int arriveAndDeregister() { return doArrive(ONE_DEREGISTER); }主要是调用doArrive实现,doArrive实现例如以下
private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ?doArrive看上去非常复杂,但事实上逻辑并不算太复杂。0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { if (unarrived == 1) { long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); releaseWaiters(phase); } else if (nextUnarrived == 0) { // propagate deregistration phase = parent.doArrive(ONE_DEREGISTER); UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else phase = parent.doArrive(ONE_ARRIVAL); } return phase; } } }
1、首先依旧是进入一个自旋结构。然后依据是否有堆叠(root == this)来获取正确状态值,接着计算阶段号phase,未到达parties数unarrived。此时假设unarrived少于等于0,则必须抛出异常,表示这次到达是非法的(由于全部的注冊parties数已经到达)。
2、接着就是利用CAS把状态值进行更改。这里是减去參数adjust值,arrive的传入參数是ONE_ARRIVE,也就是1,arriveAndDeregister是ONE_ARRIVAL|ONE_PARTY,减去之后刚好把未到达数和注冊数都减去一。
3、假设CAS成功。假设CAS之前的unarrived刚好为1,则表示此次到达是最后一个未到达party,然后又一次開始计算下一个阶段值n。接着须要依据是否堆叠进行推断:
(1)假设没有堆叠(root == this)则依照定义我们调用onAdvance,传入相对參数。此时假设onAdvance返回true。我们给n加入终结标识,假设onAdvance返回false。但下阶段的未到达parties数(同一时候也是当前注冊的parties数)为0(可能由于反注冊造成),因此要给n加入EMPTY值。否则就给n加入新的未到达parties数。接着就调用CAS把当前状态值更改为n,然后调用releaseWaiters释放上一阶段号的等待队列。注意这里第二个CAS的返回值能够忽略,由于这里与doRegister的冲突已经由doRegister的unarrived推断解决。
(2)假设(1)推断失败,则出现了堆叠,另外假设此时新的未到达数为0(全部之前的注冊parties数都被反注冊),依据堆叠的结构,我们必须向parent表示已经到达一个party而且反注冊自己,而且同一时候把当前状态CAS为EMPTY,相同。这里的CAS能够忽略返回值。
(3)假设(1)(2)推断都失败,则仅仅须要简单地把向parent调用doArrive(ONE_ARRIVE)表示自己当前全部已经注冊的parties数都到达了,然后parent就会减去一个代表这个子phaser的到达parties数。
这里顺便介绍一个比較方便的函数arriveAndAwaitAdvance。从名字上就能够看出,这个函数把arrive和awaitAdvance两个效果都合成在一起。详细实现例如以下:
public int arriveAndAwaitAdvance() { final Phaser root = this.root; for (;;) { long s = (root == this) ?函数的大致结构和doArrive几乎相同,在CAS之后假设unarrived大于1,则须要调用根结点的internalAwaitAdvance进行堵塞等待直到阶段号增长。假设unarrived小于等于1,则假设有堆叠发生(root != this)则调用父phaser的arriveAndAwaitAdvance。否则的话调用onAdvance,而且调用CAS把状态更新,然后调用releaseWaiters把之前的阶段等待队列释放。该函数对照起先调用arrive和awaitAdvance。更加方便而且因为降低了一些多余的变量读取和逻辑。速度更加快。state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ?
0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase); return nextPhase; } } }
总结
Phaser的同步模型于CountDownLatch、CyclicBarrier类似,但提供了更加灵活的同步支持,另外因为实现採用了无锁算法,整个同步操作的实现变得更加复杂。
考虑到高并发条件下的CAS竞争,也提供了不同机制去优化性能(堆叠,自旋等)。
以上是关于Phaser实现源代码剖析的主要内容,如果未能解决你的问题,请参考以下文章