Phaser并发阶段器

Posted Java面试365

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Phaser并发阶段器相关的知识,希望对你有一定的参考价值。

Phaser并发阶段器

Phaser由JDK1.7提出,是一个复杂强大的同步辅助类,是对同步工具类CountDownLatch和CyclicBarrier的综合升级,能够支持分阶段实现等待的业务场景。

我们可以回忆下CountDownLatch讲的是先指定N个线程,在N个线程干完活之前,其它线程都需要等待(导游等待旅游团所有人上车才能开车),而CyclicBarrier讲的是先指定N个线程。等N个线程到齐了大家同时干活(多个驴友相约去旅游,先到的需要等待后来的),而Phaser是两者的结合,可以理解为先指定N个线程,等N个线程到齐后开始干第一阶段的活,等第一阶段所有的线程都干完活了,接着N个线程开始干第二阶段的活,直到所有的阶段完成工作,程序结束,当然需要注意的是每个阶段可以根据业务需要新增或者删除一些线程,并不是开始指定多少个线程每个阶段就必须有多少个线程。

入门体验

看了概念可能不容易理解,从一个小demo入手体验下

public class PhaserDemo1 
    // 指定随机种子
    private static Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) 
        Phaser phaser = new Phaser();
        // 将线程注册到phaser
        phaser.register();
        for (int i = 0; i <5 ; i++) 
            Task task = new Task(phaser);
            task.start();
        
        phaser.arriveAndAwaitAdvance();
        System.out.println("all task execute close");
    

    static class Task extends Thread
        Phaser phaser;

        public Task(Phaser phaser)
            this.phaser = phaser;
            this.phaser.register();
        

        @Override
        public void run() 
            try 
                System.out.println(Thread.currentThread().getName()+"开始执行");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"执行完毕");
                // 类似CountDownLatch中的 await
                phaser.arriveAndAwaitAdvance();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    


不知道有没有这样的疑惑,phaser.register是向phaser去注册这个线程,那么为什么主线程也需要注册呢?

其实很简单主线程需要等待所有子线程执行完毕才能继续往下面执行所以必须要phaser.arriveAndAwaitAdvance();阻塞等待,而这个语句是意思当前线程已经到达屏障,在此等待一段时间等条件满足后需要向下一个屏障继续执行,如果没有主线程的phaser.register,直接调用phaser.arriveAndAwaitAdvance,在源码中提到可能会有异常,所以必须在主程序中注册phaser.register();

/* <p>It is a usage error for an unregistered party to invoke this
* method.  However, this error may result in an @code
* IllegalStateException only upon some subsequent operation on
* this phaser, if ever.
*/
译:
未注册方调用此函数是一个使用错误方法。但是,这个错误可能会导致
@codeIllegalStateException仅在一些后续操作这个相位器,如果有的话。

Phaser解决分科考试问题

从体验的示例中其实没看出其优势在哪里,上诉场景完全可以采用CountDownLatch,所以现在换一种场景来说明Phaser的优势。

假设某校举行期末考试,有三门考试语文、数学、英语,每门课允许学生提前交卷,只有当所有学生完成考试后才能举行下一次的考试,这就是典型的分阶段任务处理,示例图如下。

将上诉场景语义化如下

public class PhaserExam 
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) 
        // 一次初始化2个 相当于两次register
        Phaser phaser = new Phaser(2);

        for (int i = 0; i <2 ; i++) 
            Exam exam = new Exam(phaser,random.nextLong());
            exam.start();
        
    

    static class Exam extends Thread
        Phaser phaser;
        Long id;
        public Exam(Phaser phaser,Long id)
            this.phaser = phaser;
            this.id = id;
        

        @Override
        public void run() 
            try 
                System.out.println(Thread.currentThread().getName()+"===开始语文考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束语文考试");
                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName()+"===开始数学考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束数学考试");
                phaser.arriveAndAwaitAdvance();

                System.out.println(Thread.currentThread().getName()+"===开始英语考试");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(Thread.currentThread().getName()+"===结束英语考试");
                phaser.arriveAndAwaitAdvance();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    


代码执行结果如下,可以看到三个阶段都是等待所有线程执行完毕后才往下执行,相当于多个栅栏。

到这里请注意,通过Phaser类的构造方法构建的party数,也就是线程数需要和循环的次数对应,不然可能影响后续阶段器的正常运行。

两个重要状态

在Phaser内有2个重要状态,分别是phase和party,乍一看很难理解,他们的定义如下。

phase就是阶段,如上面提到的语文、数学、英语考试这每个考试对应一个阶段,不过phase是从0开始的,当所有任务执行完毕,准备进入下一个阶段时phase就会加一。

party对应注册到Phaser线程数,party初始值有两种形式

  • 方法一就是通过Phaser的有参构造初始化party值。

  • 方法二采用动态注册方法phaser.register()或phaser.bulkRegister(线程数)指定线程数,注销线程调用phaser.arriveAndDeregister()方法party值会减一。

Phaser常用API

Phaser常用API总结如下所示

// 获取Phaser阶段数,默认0
public final int getPhase();
// 向Phaser注册一个线程
public int register();    
// 向Phaser注册多个线程
public int bulkRegister(int parties);
// 获取已经注册的线程数,也就是重要状态party的值
public int getRegisteredParties();
// 到达并且等待其它线程到达
public int arriveAndAwaitAdvance();
// 到达后注销不等待其它线程,继续往下执行
public int arriveAndDeregister();
// 已到达线程数
public int getArrivedParties();
// 未到达线程数
public int getUnarrivedParties();
// Phaser是否结束 只有当party的数量是0或者调用方法forceTermination时才会结束
public boolean isTerminated();
// 结束Phaser
public void forceTermination();

代码演示如下

public class PhaserApiTest 
    public static void main(String[] args) throws InterruptedException 
        Phaser phaser = new Phaser(5);
        System.out.println("当前阶段"+phaser.getPhase());

        System.out.println("注册线程数==="+phaser.getRegisteredParties());
        // 向phaser注册一个线程
        phaser.register();
        System.out.println("注册线程数==="+phaser.getRegisteredParties());
        // 向phaser注册多个线程,批量注册
        phaser.bulkRegister(4);
        System.out.println("注册线程数==="+phaser.getRegisteredParties());

        new Thread(()->
            // 到达且等待
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName()+"===执行1");
        ).start();

        new Thread(()->
            // 到达不等待,从phaser中注销一个线程
            phaser.arriveAndDeregister();
            System.out.println(Thread.currentThread().getName()+"===执行2");
        ).start();

        TimeUnit.SECONDS.sleep(3);

        System.out.println("已到达线程数==="+phaser.getArrivedParties());
        System.out.println("未到达线程数==="+phaser.getUnarrivedParties());

        System.out.println("Phaser是否结束"+phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("Phaser是否结束"+phaser.isTerminated());
    


执行结果如下所示

arriveAndAwaitAdvance解析

arriveAndAwaitAdvance是Phaser中一个重要实现阻塞的API,其实arriveAndAwaitAdvance是由arrive方法和awaitAdvance方法合并而来,两个方法的作用分别为

  • arrive:到达屏障但不阻塞,返回值为到达的阶段号。

  • awaitAdvance(int):接收一个 int 值的阶段号,在指定的屏障处阻塞。

测试代码如下

public class PhaserTestArrive 
    public static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) 
        Phaser phaser = new Phaser(5);
        for (int i = 0; i <5 ; i++) 
            new Task(i,phaser).start();
        
        phaser.register();
        // 主线程需要调用arrive的原因是主线程注册的第六个线程还未到达,需要手动到达,才能调用awaitAdvance阻塞屏障
        phaser.arrive();
        // 因为Phaser线程数为6,所以即使5个线程已经到达,但是还差主线程的一个,目前阶段数就是0
        phaser.awaitAdvance(0);
        System.out.println("all task is end");
    

    static class Task extends Thread
        Phaser phaser;

        public Task(int num,Phaser phaser)
            super("Thread--"+String.valueOf(num));
            this.phaser = phaser;
        

        @Override
        public void run() 
            try 
                System.out.println(Thread.currentThread().getName()+"===task1 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task1 is end");

                // 到达且不等待
                phaser.arrive();

                System.out.println(Thread.currentThread().getName()+"===task2 is start");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(Thread.currentThread().getName()+"===task2 is end");
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    



中断响应

我们需要特别注意的就是Phaser所有API中只有awaitAdvanceInterruptibly是响应中断的,其余全部不会响应中断所以不需要对其进行异常处理,演示如下

public static void main(String[] args) 
        Phaser phaser = new Phaser(3);

        Thread T1 = new Thread(()->
            try 
                phaser.awaitAdvanceInterruptibly(phaser.getPhase());
             catch (InterruptedException e) 
                System.out.println("中断异常");
                e.printStackTrace();
            
            //phaser.arriveAndAwaitAdvance();
        );

        T1.start();

        T1.interrupt();

        phaser.arriveAndAwaitAdvance();
    

以上是关于Phaser并发阶段器的主要内容,如果未能解决你的问题,请参考以下文章

并发编程系列之Phaser用法简介

并发编程系列之Phaser用法简介

Java并发编程之Phaser类

Java线程与并发编程实践----同步器(Phaser)

java并发之同步辅助类Phaser

死磕 java同步系列之Phaser源码解析