Fork/Join 上下文中的 Phaser Vs CyclicBarrier

Posted

技术标签:

【中文标题】Fork/Join 上下文中的 Phaser Vs CyclicBarrier【英文标题】:Phaser Vs CyclicBarrier in the context of Fork/Join 【发布时间】:2019-07-28 11:50:30 【问题描述】:

在尝试了解 Phaser 和 CyclicBarrier 之间的区别时,我遇到了一些链接 Difference between Phaser and CyclicBarrier 和 https://www.infoq.com/news/2008/07/phasers/ 我读到 Phaser 与 Fork/Join 接口兼容,而 CyclicBarrier 不兼容,这里有一个代码来证明这一点:

移相器

 public static void main(String[] args) throws InterruptedException 

        CountDownLatch countDownLatch = new CountDownLatch(1);

        Phaser phaser = new Phaser(16)
            @Override
            protected boolean onAdvance(int phase, int registeredParties) 
                return phase ==1 || super.onAdvance(phase, registeredParties);
            
        ;

        System.out.println("Available Processors: "+Runtime.getRuntime().availableProcessors());

        ExecutorService executorService = ForkJoinPool.commonPool(); // Runtime.getRuntime().availableProcessors() -1

        for (int i = 0; i < 16; i++) 
            final int count = 0;
            executorService.submit(() -> 
                while (!phaser.isTerminated()) 
                    try 
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                        System.out.println(Thread.currentThread().getName() + count + " ... ");
                        phaser.arriveAndAwaitAdvance();
                        System.out.println(Thread.currentThread().getName() + count + " ... continues ... ");
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
                countDownLatch.countDown();
            );
        
        countDownLatch.await();
    

循环障碍

public static void main(String[] args) throws InterruptedException 

        AtomicInteger phases = new AtomicInteger();
        CountDownLatch  countDownLatch = new CountDownLatch(1);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(16, () -> phases.incrementAndGet());

        ExecutorService executorService = ForkJoinPool.commonPool();

        for (int i = 0; i < 16; i++) 
            executorService.submit(() -> 
                while (phases.get() < 1) 
                    try 
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    try 
                        System.out.println(Thread.currentThread().getName() + " Ok, I am waiting ");

                        cyclicBarrier.await();

                        System.out.println(Thread.currentThread().getName() + " continued it's way ... ");
                     catch (BrokenBarrierException e) 
                        e.printStackTrace();
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    countDownLatch.countDown();
                
            );
        
        countDownLatch.await();
    

解释:

这两个代码运行一个 fork/join 线程池,这意味着线程是守护线程,这就是我使用 CountDownLatch 的原因。方法commonPool() 将创建一个线程池,其线程等于Runtime.getRuntime().availableProcessors(),我的是12,所以它将创建12 个线程。两个示例中的 Phaser 和 CyclicBarrier 都定义了 16 个参与方,即他们需要 16 次调用循环屏障中的 await() 和 Phaser 中的 arriveAndAwaitAdvance() 才能继续。

在带有移相器的示例中,当第 12 个线程阻塞时,fork/join 将产生更多线程,它将创建更多线程,因此移相器最终将终止。但是,当第 12 个线程到达await() 时,使用 CyclicBarrier 程序会停止并且永远不会前进,它会挂起。显然,因为屏障需要 16 次调用,才能使线程前进,而创建的线程只进行了 12 次。线程池不会像使用 Phaser 那样创建更多线程来推进 CyclicBarrier。

问题:

fork/join 如何使用 Phaser 而不是 CyclicBarrier 创建更多线程? 为什么 arriveAndAwaitAdvance() 方法使线程池创建新线程,以及如何创建新线程,但方法 await() 没有导致线程池创建更多线程?

【问题讨论】:

【参考方案1】:

Phaser 能够做到这一点,因为它在阻塞线程时会在内部调用 ForkJoinPool.managedBlock(ManagedBlocker)

任何人都可以访问 ForkJoinPool 的这个 API,因此您可以轻松地增强您的 CyclicBarrier 版本以使用它,并消除线程饥饿。例如带有以下氛围的东西:

ForkJoinPool.managedBlock(new ManagedBlocker() 

    boolean isReleasable = false;

    @Override
    public boolean block() throws InterruptedException 
        try 
            cyclicBarrier.await();
         catch (BrokenBarrierException aE) 
            throw new IllegalStateException(aE);
        
        return isReleasable = true;
    

    @Override
    public boolean isReleasable() 
        return isReleasable;
    
);

【讨论】:

以上是关于Fork/Join 上下文中的 Phaser Vs CyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列Fork/Join框架之概览

编写多人国际象棋游戏:HTML5 Canvas + ReactJS vs. Phaser 3

多线程高并发编程 -- Fork/Join源码分析

多线程高并发编程 -- Fork/Join源码分析

Java Fork/Join 框架

Fork/Join框架