CountDownLatchCyclicBarrier和Semaphore

Posted genesisx

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CountDownLatchCyclicBarrier和Semaphore相关的知识,希望对你有一定的参考价值。

一.CountDownLatch用法

CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,你只需要把这个CountDownLatch的引用传递到线程里。

其他方法

如果有某个解析sheet的线程处理的比较慢,我们不可能让主线程一直等待,所以我们可以使用另外一个带指定时间的await方法,await(long time, TimeUnit unit): 这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。

注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。

public class CountDownLatchTest {
    private AtomicInteger atomicInteger = new AtomicInteger();

    @Test
    public void cuntDownLatchTest() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        executorService.execute(() -> {
            atomicInteger.addAndGet(1000);
            countDownLatch.countDown();
        });
        executorService.execute(() -> {
            atomicInteger.addAndGet(1000);
            countDownLatch.countDown();
        });
        executorService.execute(() -> {
            atomicInteger.addAndGet(1000);
            countDownLatch.countDown();
        });
        System.out.println("等待子线程执行");
        countDownLatch.await();
        System.out.println("子线程执行完毕");
        System.out.println(atomicInteger);

    }
}

 执行结果

等待子线程执行
子线程执行完毕
3000

二.CyclicBarrier用法

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class CyclicBarrierTest {
    private static final int THREAD_NUMBER = 3;
    private static CyclicBarrier sCyclicBarrier = new CyclicBarrier(
            THREAD_NUMBER, new Runnable() {
        @Override
        public void run() {
            System.out.println("大家都到达了宿舍楼下,一起出发吧。。。");
        }
    });

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
        for (int i = 0; i < THREAD_NUMBER; i++) {
            executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i));
        }
        try {
            Thread.sleep(10000);//主线程睡眠
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("CyclicBarrier重用");
        for (int i = THREAD_NUMBER; i < THREAD_NUMBER * 2; i++) {
            executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i));
        }
    }

    /**
     * 从宿舍到食堂线程
     *
     * @author LiuYi
     */
    public static class WalkFromDomitoryToCanteenRunnable implements Runnable {
        private CyclicBarrier mCyclicBarrier;
        private String mName;

        public WalkFromDomitoryToCanteenRunnable(CyclicBarrier cyclicBarrier,
                                                 String name) {
            this.mCyclicBarrier = cyclicBarrier;
            this.mName = name;
        }

        @Override
        public void run() {
            System.out.println(mName + "开始从宿舍出发。。。");
            try {
                Thread.sleep(1000);
                mCyclicBarrier.await();// 等待别同学
                // 前往食堂
                System.out.println(mName + "开始从宿舍楼下出发。。。");
                Thread.sleep(1000);
                System.out.println(mName + "达到食堂。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}

 执行结果

同学0开始从宿舍出发。。。
同学2开始从宿舍出发。。。
同学1开始从宿舍出发。。。
大家都到达了宿舍楼下,一起出发吧。。。
同学0开始从宿舍楼下出发。。。
同学2开始从宿舍楼下出发。。。
同学1开始从宿舍楼下出发。。。
同学0达到食堂。。。
同学2达到食堂。。。
同学1达到食堂。。。
CyclicBarrier重用
同学3开始从宿舍出发。。。
同学4开始从宿舍出发。。。
同学5开始从宿舍出发。。。
大家都到达了宿舍楼下,一起出发吧。。。
同学3开始从宿舍楼下出发。。。
同学4开始从宿舍楼下出发。。。
同学5开始从宿舍楼下出发。。。
同学4达到食堂。。。
同学5达到食堂。。。
同学3达到食堂。。。

 

CyclicBarrier的应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

三.Semaphore用法

  Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

  Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

 假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }
     
    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
         
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();           
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 执行结果

工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器

 

以上是关于CountDownLatchCyclicBarrier和Semaphore的主要内容,如果未能解决你的问题,请参考以下文章