腾讯一面 Java Phaser 并发编程的理解与应用
Posted 陈皮的JavaLib
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了腾讯一面 Java Phaser 并发编程的理解与应用相关的知识,希望对你有一定的参考价值。
我是陈皮,一个在互联网 Coding 的 ITer,微信搜索「陈皮的JavaLib」第一时间阅读最新文章,回复【资料】,即可获得我精心整理的技术资料,电子书籍,一线大厂面试资料和优秀简历模板。
Phaser
Phaser
是 JDK1.7 推出的,一个可重用的同步障碍,与CyclicBarrier
,CountDownLatch
功能类似,但是它支持更灵活的用法。
先简单说明这个类的作用。假设有一个大工程,可以分为多个阶段,每一个阶段有多个人参与,并且每一个阶段需要参与的所有人都完成这个阶段的事情,才可以进入下一个阶段,然后所有人又继续做下一个阶段的事,直到所有阶段都完成,当然这途中每个人都可以随时退出,整个工程也可以中途终止。
例如某天陈皮约小美,小雪去他家吃饭。这个事情可以分为三个阶段,第一阶段去超市买食材,第二阶段炒菜,第三阶段吃饭,假设每一个阶段完成后才能继续下一个阶段。
首先定义阶段器类,继承 Phaser 重写 onAdvance 方法来对每一个阶段进行不同的操作。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
接下来定义参与的任务,编写每一个任务在每一个阶段需要干的事情。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表陈皮
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiTask implements Runnable {
private Phaser phaser;
public ChenPiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好猪肉了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好猪肉了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小美
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoMeiTask implements Runnable {
private Phaser phaser;
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好白菜了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好白菜了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小雪
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoXueTask implements Runnable {
private Phaser phaser;
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好鲍鱼了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好鲍鱼了...");
// 第二阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 吃饱了...");
// 第三阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
}
}
最后,编写测试类,进行测试验证。
package com.chenpi;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiMain {
public static void main(String[] args) {
// 创建吃饭阶段器,注册3个任务(人)
DiningPhaser diningPhaser = new DiningPhaser();
diningPhaser.bulkRegister(3);
// 三个人同时开始干活
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
thread1.start();
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
thread2.start();
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
thread3.start();
}
}
最后,启动服务,显示结果如下。
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦...
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦...
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
第三阶段,吃完饭啦...
Phaser 详解
通过以上简单例子已知道 Phaser 的作用了。其实它的作用不止这些。
可以动态调整注册任务的数量(最大注册的任务数量为65535)。任务可以在任何时间注册(register
和 bulkRegister
方法,或者以构造方法形式注册初始任务数量),也可以在任何到达时注销注册的任务(arriveAndDeregister
方法)。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小雪
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoXueTask implements Runnable {
private Phaser phaser;
public XiaoXueTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 买好鲍鱼了...");
// 第一阶段的事干完了,等待其他人完成才能进入下一阶段
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " 炒好鲍鱼了...");
// 第二阶段的事干完了,小雪有事先走了
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName() + "有事先走了");
}
}
我们修改小雪这个任务,她干完第二阶段的事情就有事先走了,即注销任务。结果如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
小雪有事先走了
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:2
小美 吃饱了...
陈皮 吃饱了...
第三阶段,吃完饭啦!总共参与人数:2
注册和注销只影响内部计数,内部没有记录具体的注册任务,所以不能查询哪个任务是否已注册。但是我们可以编写 Phaser 的子类来实现记录注册的具体任务。
package com.chenpi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
// 记录注册的任务
private List<Runnable> registeredTask = new ArrayList<>();
public int register(Runnable task) {
registeredTask.add(task);
return super.register();
}
public List<Runnable> getRegisteredTask() {
return registeredTask;
}
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
测试类中,每一个任务单独注册并记录。
package com.chenpi;
/**
* @Description
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class ChenPiMain {
public static void main(String[] args) {
// 创建吃饭阶段器,注册3个任务(人)
DiningPhaser diningPhaser = new DiningPhaser();
// 三个人同时开始干活
Thread thread1 = new Thread(new ChenPiTask(diningPhaser));
thread1.setName("陈皮");
diningPhaser.register(thread1);
thread1.start();
Thread thread2 = new Thread(new XiaoMeiTask(diningPhaser));
thread2.setName("小美");
diningPhaser.register(thread2);
thread2.start();
Thread thread3 = new Thread(new XiaoXueTask(diningPhaser));
thread3.setName("小雪");
diningPhaser.register(thread3);
thread3.start();
System.out.println("注册的任务:" + diningPhaser.getRegisteredTask());
}
}
启动服务,打印了在阶段器中注册的任务。
陈皮 买好猪肉了...
注册的任务:[Thread[陈皮,5,main], Thread[小美,5,main], Thread[小雪,5,main]]
小美 买好白菜了...
小雪 买好鲍鱼了...
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
第二阶段,炒菜完成啦!总共参与人数:3
陈皮 吃饱了...
小雪 吃饱了...
小美 吃饱了...
第三阶段,吃完饭啦!总共参与人数:3
对于同步性质,像 CyclicBarrier 一样,Phaser 可以重复等待。Phaser 的 arriveAndAwaitAdvance
方法的作用类似于 CyclicBarrier 的 await
方法。
每一个 Phaser 对象都会关联一个阶段数。这个数从0开始,当所有注册的任务都到达每一个阶段的时候,这个数就递增一次。特别地,如果这个数到达 Integer.MAX_VALUE
后就会重新变回到0。
arrive
和 arriveAndDeregister
方法记录到达,这2个方法不会阻塞,它们会返回已到达的阶段数。
arrive 方法表示当前任务已到达某个阶段,但是不会等待其他任务到达此阶段。arriveAndDeregister 方法表示当前任务已到达某个阶段,并且注销任务。
在每一个阶段中,当所有任务都到达的时候,onAdvance
方法会被最后一个触发阶段到达的任务执行,然后进入下一个阶段。onAdvance 方法可以控制一个 Phaser 的终止,如果我们的 Phaser 对象是继承 Phaser 的子类,可以重写 onAdvance 方法,在每一个阶段到达时这个方法就会被调用从而在每一个阶段做我们想做的事情。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() + " 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
我们在 onAdvance 方法中打印当前线程,结果表明确实是最后一个触发阶段到达的任务执行 onAdvance 方法,如下:
陈皮 买好猪肉了...
小美 买好白菜了...
小雪 买好鲍鱼了...
小雪 调用了onAdvance方法
第一阶段,买食材完成啦!总共参与人数:3
小雪 炒好鲍鱼了...
陈皮 炒好猪肉了...
小美 炒好白菜了...
小美 调用了onAdvance方法
第二阶段,炒菜完成啦!总共参与人数:3
小美 吃饱了...
陈皮 吃饱了...
小雪 吃饱了...
小雪 调用了onAdvance方法
第三阶段,吃完饭啦!总共参与人数:3
phaser 可以随时被终止。当终止时,所有同步方法(例如 arriveAndAwaitAdvance )会立即返回而不用阻塞等待,并且返回一个负数。同样地,被终止后无法再注册任务。isTerminated
方法可以判断是否已经终止。
phaser 可以在 onAdvance
方法中返回 true
来达到终止的效果。例如我们继承 Phaser 编写的子类可以重写此方法,当到达某个阶段后返回 true 来终止阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 吃饭阶段器
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class DiningPhaser extends Phaser {
/**
* 每一阶段到达时,都会执行这个方法,执行完后 phase 自动加1,代表进入下一阶段
*
* @param phase 代表哪个阶段,从0开始
* @param registeredParties 注册的任务
* @return 是否终止
*/
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName() + " 调用了onAdvance方法");
switch (phase) {
case 0:
System.out.println("第一阶段,买食材完成啦!总共参与人数:" + registeredParties);
return false;
case 1:
System.out.println("第二阶段,炒菜完成啦!总共参与人数:" + registeredParties);
return false;
case 2:
System.out.println("第三阶段,吃完饭啦!总共参与人数:" + registeredParties);
return false;
default:
return true;
}
}
}
默认的 onAdvance 方法实现是当注册的任务注销到0的时候返回 true,源码如下:
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
phaser 的 forceTermination
方法其实也可以强制终止阶段器。还是以上述例子,小美干完第一阶段的事情后,觉得这样等来等去太费时间了,所以终止这个阶段器。
package com.chenpi;
import java.util.concurrent.Phaser;
/**
* @Description 任务,代表小美
* @Author 陈皮
* @Date 2021/7/4
* @Version 1.0
*/
public class XiaoMeiTask implements Runnable {
private Phaser phaser;
public XiaoMeiTask(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
Java线程与并发编程实践----同步器(Phaser)