源码分析-Phaser
Posted 千念飞羽
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析-Phaser相关的知识,希望对你有一定的参考价值。
Phaser
使用方法
这是一个比CyclicBarrier更加灵活的同步屏障。从灵活性的角度来说Phaser>CyclicBarrier>CountDownLatch。
Phaser中有一个概念叫阶段(用phaser)表示,这个只能增长不能减小。每个阶段可以有不同数量的分阶段(party,不知道怎么翻译了。先用这个)。当有足够数量的分阶段到达的时候就进入下一个阶段。
获得Phaser引用后可以用通过阻塞等待Phaser执行到某个阶段。有多种阻塞的方式, awaitAdvance(int phase),awaitAdvanceInterruptibly(int phase), awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)等等。
每个分阶段差不多可以代表一个子任务,通过多线程的方式去执行。当执行完了之后就表示当前的任务到达(arrive)当前屏障,这里有几个方法可以用来控制工作线程执行完任务后到达屏障后需要做什么,arrive()什么都不做,arriveAndAwaitAdvance()到达后阻塞等待其他线程, arriveAndDeregister()到达后注销线程。这里还有一点需要说明的是Phaser只通过一个数字去表示当前有多少任务注册,多少任务到达,没有办法查询某个具体的任务是否已经注册。
如何去确定当前阶段有多少个分阶段?当然在第一个阶段可以通过构造器去设置,在后续的阶段,如果要增加分阶段可以通过bulkRegister或者register增加,如果要减少则需要在arriveRegister来注销。这两种方式来控制每个阶段的子阶段数量。
当然还有一些其他方法可以用来监控当前的Phaser当前的阶段和子阶段和到达状态,以及强制关闭当前Phaser等等监控方法。
此外Phaser还有一个特性就是它可以组织成一个树的形式。这样可以在大数量的情况下更好的工作。
工作原理
首先状态控制方面,这里先简单说一下,其实在之前看并发的类的时候也经常用到这种方法,也就是通过将所有的状态变化都包装到一个int或者long中这样可以尽可能的减少同步区的长度。如果用非阻塞的算法的话,还可以通过cas的方法去设置。这样的编程方式是并发编程中常用的方法。
Phaser的状态通过一个volatile的long来表示state:
- 0-15位 :当前未达到屏障的子阶段
- 16-31位:等待的子阶段
- 32-2位:当前阶段号
- 63位:当前屏障是否终止
如果是空状态,也就是没有子阶段注册的初始阶段。这里用一个EMPTY状态表示,也就是0个子阶段和一个未到达阶段
所有的状态变化都是通过CAS操作执行的。唯一的例外是注册一个子相移器(sub-Phaser)(用于构成树的,也就是Phaser的父Phaser非空),这个子相移器的分阶段通过一个内置锁来设置。
首先是一系列 静态变量和操作state变量的静态方法。这里就不一一看看。
其次来看一下一个Phaser实例的内部非静态类:
private volatile long state;
private final Phaser parent;
private final Phaser root;
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
其的都比较好理解,这里的evenQ和oddQ是根据phaser的奇偶状态来设置的。
这是用来储存等待的线程的。为了避免竞争,这里使用了Phaser的数值奇偶来储存,此外对于子相移器,它与其根相移器使用同一个evenQ
或者oddQ,以加速释放。
主要方法
首先来看三个private的核心方法:doArrive、doRegister、reconcileState
doArrive
doArrive是用来完成任务完成后到达的操作的
private int doArrive(boolean deregister)
int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;//通过传入参数判断有哪些参数需要减1。
final Phaser root = this.root;
for (;;)
long s = (root == this) ? state : reconcileState();//获取当前状态,以及并解析当前参数。
int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)//phase为负说明出现特殊情况则将phase返回。
return phase;
else if (counts == EMPTY || unarrived < 0) //如果状态为空或者未到达线程为负,则逻辑上不应该存在线程到达,
if (root == this || reconcileState() == s)//如果root为this则说明状态出错抛出异常,但是如果该相移器还有父相移器,则还有可能出现相位传播的延迟,这里交给reconcileState来判断,如果依然出现非法状态则抛出异常。reconcileState后面会说到。
throw new IllegalStateException(badArrive(s));
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) //完成条件判断后,尝试CAS设置当前状态。
if (unarrived == 0) //如果当前到达是该阶段最后一个到达的程序则需要进入下一个阶段。
long n = s & PARTIES_MASK; // base of next state//保留子阶段数值。
int nextUnarrived = (int)n >>> PARTIES_SHIFT;//设置下一个阶段你的数值。
if (root != this)//如果当前phaser有根节点则调用父节点的根节点。
return parent.doArrive(nextUnarrived == 0);
if (onAdvance(phase, nextUnarrived))//判断是否可以补进当前节点,实际上这个函数判断是就是nextUnarrived是否是0如果是0则不应该补进,如果不应该补进则返回真,这时候就将phaser终止。这里之所以还专门用一个onAdvance实际上是提供一个hook方法,为后续的实现提供方便。
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)//如果不应该终止,而且nextUnarrived又为0,则需要专门设置一个空状态。理由之前说过。
n |= EMPTY;
else//当然更普遍的情况下还是只是设置一下下一个阶段未到达线程数量。
n |= nextUnarrived;
n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;//构造一个新的state变量。并使用CAS的方式去设置他。
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);//释放所有等的节点。
return phase;//返回phase数字
这里有两个小的方法需要专门说一下:
首先是hook方法onAdvance:
这个方法会在相位步进的时候执行。如果返回true则相移器会进入到终止状态。调用这个方法产生的非检查异常和Error会传播给该阶段的子线程并抛出。
输入参数是当前相移器的阶段。在执行onAdvance的过程中不应该调用arrival。register。和等待方法都是不可靠的。
如果这个相移器是一个相移器树集的一部分。则onAdvance只对其root节点进行判断。
这个方法的默认行为就是当下个阶段没有注册的子阶段则返回true;
protected boolean onAdvance(int phase, int registeredParties)
return registeredParties == 0;
其次是releaseWaiters方法。
private void releaseWaiters(int phase)
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;//区别奇偶是为了简化操作。防止在进入新的阶段有新的应该阻塞的线程也释放掉。
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) //这里需要说明的是当执行releaseWaiters时,已经将state步进入了下一阶段。所以这里只要Qnode中所保存的阶段与当前阶段不同就应该消除。
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null)
q.thread = null;
LockSupport.unpark(t);//解锁当前线程。
doRegister
private int doRegister(int registrations)
// adjustment to state
long adj = ((long)registrations << PARTIES_SHIFT) | registrations;//计算出需要调整的量。
final Phaser parent = this.parent;//查看可能存在的相移器
int phase;
for (;;)
long s = state;//计算当前状态的参数。
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)//如果需要注册的数量超过运行注册的最大值,则抛出异常状态异常
throw new IllegalStateException(badRegister(s));
else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)//如果当前状态为终止状态则跳出循环直接退出。
break;
else if (counts != EMPTY) // not 1st registration//如果当前状态不是第一此注册线程。
if (parent == null || reconcileState() == s) //如果当相移器的父相移器为空,则直接信息CAS,如果当前相移器部位空则调用reconcileState处理,这个稍后再看。reconcileState这里主要为了防止出现同步性错误。
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adj))
break;
//当前状态是第一次注册。如果如果当前相移器没有父相移器。则直接进行CAS
else if (parent == null) // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adj;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
else //如果当前是第一次设置,并且该相移器被组织在一个树中则需要考虑一下,则需要使用内置锁来进如
synchronized (this) // 1st sub registration
if (state == s) // recheck under lock//这里有可能发生竞争。所以这里还需要检查一下,如果失败则需退出同步区重新尝试进入。
parent.doRegister(1);//调用其父相移器的注册方法
do // force current phase
phase = (int)(root.state >>> PHASE_SHIFT);//使用其父相移器的状态的阶段数覆盖当前相移器的状态的阶段数。
// assert phase < 0 || (int)state == EMPTY;
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, state,
((long)phase << PHASE_SHIFT) | adj));
break;
return phase;
这里有一个需要重点看一下的方法即reconcileState
这里方法主要是为了处理在树构造中可能存在的相位延迟问题。比如有时候当父相移器已经步进了,但是其子相移器并没有步进。这很正常。这时候需要使得子相移器的未到达子阶段为0。*(或者子阶段数为0,折重新设置未注册的空状态)。然而这个方法也会导致也有可能会有一些浮动的子相移器想要设置未到达子阶段数量纯粹只是为了赶上当前线程,这样的情况下会调用这个方法。这时候计数不会受到影响。
private long reconcileState()
final Phaser root = this.root;
long s = state;
if (root != this)
int phase, u, p;
// CAS root phase with current parties; possibly trip unarrived
//下面这个while语句比较的麻烦。实际上就是干了一件事,当子相移器和父相移器的阶段不同的时候重新设置当前相移器的状态。
while ((phase = (int)(root.state >>> PHASE_SHIFT)) != (int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
(s & PARTIES_MASK) |
((p = (int)s >>> PARTIES_SHIFT) == 0 ? EMPTY :
(u = (int)s & UNARRIVED_MASK) == 0 ? p : u))))
s = state;
return s;
arriveAndAwaitAdvance
相移器的工作线程调用这个方法来完成该相移器子阶段所指定的任务。并且等待其他子阶段完成或者相移器被终止。
public int arriveAndAwaitAdvance()
// Specialization of doArrive+awaitAdvance eliminating some reads/paths
final Phaser root = this.root;
for (;;)
long s = (root == this) ? state : reconcileState();//s为当前的相移器的状态。并保证了与父相移器的同步,之后计算各个参数
int phase = (int)(s >>> PHASE_SHIFT);
int counts = (int)s;
int unarrived = (counts & UNARRIVED_MASK) - 1;
if (phase < 0)//如果phase小于0则说明相移器终止。返回这个相移器。
return phase;
else if (counts == EMPTY || unarrived < 0) //如果当前状态为空。未到达数为负数则状态异常。不太明白这里的意思。
if (reconcileState() == s)//如果没有出现与父相移器的一致性问题则说明确实有错,则抛出异常,否则进行下一轮读取。
throw new IllegalStateException(badArrive(s));
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,//没有出现异常情况则尝试CAS设置一个到达状态如果成功则需要进行后续判断
s -= ONE_ARRIVAL))
if (unarrived != 0)//成功到达一个子阶段后相移器未到达子阶段不为0。则需要将其情况反馈到根相移器中。internalAwaitAdvance只会被根相移器调用。并且会阻塞直到阶段步进或者被打断。
return root.internalAwaitAdvance(phase, null);
if (root != this)//到达这里必定未到达子阶段为0,因为如果当前调用时不为0,则由于调用了root.internalAwaitAdvance(phase, null);会阻塞直至到阶段步进了为止如果恰巧是最后一个到达的则直接到达这里。若存在父相移器则调用器父阶段的到达等待。
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state//到达这里说明所有阶段都已到达,且当前相移器没有父相移器,当前其自身是有可能有子相移器的。下面要做的事情就是设置当前的父状态为下一个阶段的状态。
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
这里有一个方法需要注意一下:
internalAwaitAdvance方法。实际上Phaser中阻塞都是通过这个语句实现的。这个语句必须通过根相移器调用。换句话说所有的阻塞都是在根相移器阻塞的。
输入参数中phase是需要阻塞的阶段。node是用来跟踪可能中断的阻塞节点。
private int internalAwaitAdvance(int phase, QNode node)
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued//所需要的节点是否已经入队。
int lastUnarrived = 0; // to increase spins upon change//用来记录循环中上一次的未到达数量
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) //在当前阶段不变的情况下进入循环。
if (node == null) // spinning in noninterruptible mode//如果不需要中断则使用这半段程序
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;//SPINS_PER_ARRIVAL代表了每次到达后自旋的次数。
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) // need node to record intr//如果当前线程中断或者自旋到头了则需要构造一个新的QNode并且设置当前中断的状态。也就是说如果在自旋的过程中没有中断则需要入队阻塞。后等待任务完成或者中断。
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
else if (node.isReleasable()) // done or aborted//此外这里也是唯一的中断出口。如果自旋阶段就有中断就直接跳出跳出循环,这样就不需要入队之类的麻烦的问题。
break;
else if (!queued) // push onto queue//进行到这里说明需要入队。
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq//这里给出的理由是防止延迟的相位。但是我不太明白这里为什么出现这个。因为我觉得这里应该不出现这个。如果成功通过判断则CAS设置当前QNode节点。
queued = head.compareAndSet(q, node);
else
try
ForkJoinPool.managedBlock(node);//使用一个ForkJoinPool来管理阻塞。
catch (InterruptedException ie)
node.wasInterrupted = true;//如果节点被中断了则设置当前节点的中断标志
if (node != null) 针对退出的原因进行处理。
if (node.thread != null)//如果node的thread不是null则需要将其设为null。这样就不会再releaseWait或者abortWait中接触阻塞。这里想的不是太明白。大概的意思应该是当程序进行到这里,当前节点应该已经完成任务或者中断了。所以不需要再release中接触阻塞。thread为null则不会再接触阻塞了。
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)//判断当前线程是否是中断产生的。如果是则需要传播这个线程。
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)//如果是是由于任务完成而退出了上一段的循环,则phase不可能不增加,既然这里增加说明这里node是由于中断产生的。而同时又没出现一致性错误则尝试abortWait。这是releaseWait的变体。尝试清理Qnode链表中的由于中断而不需要等待的节点
return abortWait(phase); // possibly clean up on abort
releaseWaiters(phase);//能进行到这里说明phase已经步进了。也就是说当前的QNode是由于任务结束而完成的。所以正常调用releaseWaiters而后退出线程。
return p;
此外这里还对比一下两个释放QNode节点的方法:
针对第一个方法releaseWaiters方法是不断的从链表中不断的找到节点。然后对于需要解锁的线程进行解锁。
abortWait则是和releaseWaiters类似。只是当首节点的QNode的Thread为null,也就是当前QNode被中断等非正常情况下运行CAS设置其next。
/**
* Removes and signals threads from queue for phase.
*/
private void releaseWaiters(int phase)
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT))
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null)
q.thread = null;
LockSupport.unpark(t);
/**
* Variant of releaseWaiters that additionally tries to remove any
* nodes no longer waiting for advance due to timeout or
* interrupt. Currently, nodes are removed only if they are at
* head of queue, which suffices to reduce memory footprint in
* most usages.
*
* @return current phase on exit
*/
private int abortWait(int phase)
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;)
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null)
q.thread = null;
LockSupport.unpark(t);
QNode
QNode用来跟踪当前线程的信息的。QNode被组织成单向链表的形式。用来管理是否阻塞或者被中断。
QNode继承自ForkJoinPool.ManagedBlocker。ForkJoinPool来管理是否阻塞和中断状态。这里只需要重写isReleasable和block。isReleaseable用于判断是否释放当前节点。block用于阻塞。
整体代码比较简单。只需要提醒一点就是在isReleasable中使用了thread=null来使得避免解锁任务。使用方法类似于internalAwaitAdvance中的用法。
static final class QNode implements ForkJoinPool.ManagedBlocker
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
long lastTime;
volatile Thread thread; // nulled to cancel wait
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos)
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.lastTime = timed ? System.nanoTime() : 0L;
thread = Thread.currentThread();
public boolean isReleasable()
if (thread == null)
return true;
if (phaser.getPhase() != phase)
thread = null;
return true;
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible)
thread = null;
return true;
if (timed)
if (nanos > 0L)
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0L)
thread = null;
return true;
return false;
public boolean block()
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0)
LockSupport.parkNanos(this, nanos);
return isReleasable();
以上是关于源码分析-Phaser的主要内容,如果未能解决你的问题,请参考以下文章