JUC提供的几种线程之间协作的工具类

Posted plumswine

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC提供的几种线程之间协作的工具类相关的知识,希望对你有一定的参考价值。


CountDownLatch 倒计时门闩

    /**
     * CountDownLatch不能被重用, 如果需要重新计数, 可以考虑CyclicBarrier
     * all wait for onw, 如多个运动员等待裁判员鸣枪, 比赛开始
     * one wait for all, 如等待所有运动员到达终点, 比赛结束
     */
    @Test
    public void testCountDownLatch() throws InterruptedException {
        int size = 10;
        final CountDownLatch end = new CountDownLatch(size);
        final CountDownLatch begin = new CountDownLatch(1);
        for (int i = 0; i < size; i++) {
            executors.submit(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("{} 准备起跑", Thread.currentThread().getName());
                    try {
                        begin.await();
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                    LOGGER.info("{} 起跑", Thread.currentThread().getName());

                    try {
                        // 随机的跑步时间
                        long time = Math.abs(new Random().nextInt(20));
                        Thread.sleep(time);
                        LOGGER.info("{} 到达终点", Thread.currentThread().getName());
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                    end.countDown();
                }
            });
        }
        Thread.sleep(100L);
        LOGGER.info("裁判员鸣枪, 比赛开始");
        begin.countDown();
        end.await();
        LOGGER.info("所有运动员到达终点, 比赛结束");
    }

CyclicBarrier 循环珊栏

    /**
     * 等待固定线程到达了珊栏位置, 所有线程才能继续向下执行
     * 类似于在等待所有人都到齐后, 大家才开始讨论工作
     * <p>
     * CyclicBarrier的对象是一个一个的线程
     * CountDownLatch则是一个又一个的事件
     */
    @Test
    public void testCyclicBarrier() throws InterruptedException {
        int num = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num);
        LOGGER.info("等待第一次讨论");
        for (int i = 0; i < num; i++) {
            Thread.sleep(100L);
            executors.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(300L);
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                    LOGGER.info("{} 到达", Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        // do nothing
                    } catch (BrokenBarrierException e) {
                        // do nothing
                    }
                    // 三个线程统一在此处, 一起讨论工作
                    LOGGER.info("{} 一起讨论工作", Thread.currentThread().getName());
                }
            });
        }
        Thread.sleep(1000L);
        // 在使用完一次之后, 自动重用
        LOGGER.info("等待第二次讨论");
        for (int i = 0; i < num; i++) {
            Thread.sleep(100L);
            executors.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(300L);
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                    LOGGER.info("{} 到达", Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        // do nothing
                    } catch (BrokenBarrierException e) {
                        // do nothing
                    }
                    // 三个线程统一在此处, 一起讨论工作
                    LOGGER.info("{} 一起讨论工作", Thread.currentThread().getName());
                }
            });
        }
        Thread.sleep(1000L);
        // 除了自动重用之外, 还可以手动的reset
        // cyclicBarrier.reset();
        LOGGER.info("{} end", Thread.currentThread().getName());
    }

Semaphore 信号量

    @Test
    public void testSemaphore() throws InterruptedException {
        // true参数表示是否公平, 默认是非公平
        final Semaphore semaphore = new Semaphore(3, true);
        for (int i = 0; i < 10; i++) {
            executors.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        // acquire必须响应中断, 还有一种不需要响应中断的api
                        // semaphore.acquireUninterruptibly();
                        // acquire还支持传入int参数, 线程或者说任务可以根据权重拿到多个许可证
                        semaphore.acquire();
                        // 还可以try
                        // semaphore.tryAcquire();
                        LOGGER.info("{} 获取到许可证", Thread.currentThread().getName());
                        Thread.sleep(300L);
                    } catch (InterruptedException e) {
                        // do nothing
                    }
                    semaphore.release();
                }
            });
        }
        Thread.sleep(2000L);
        LOGGER.info("end");
    }

Condition 条件对象

    /**
     * Condition的signal方法必须在持有锁的情况下才能执行
     */
    @Test
    public void testCondition() throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        for (int i = 0; i < 10; i++) {
            executors.submit(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        LOGGER.info("{} 开始等待, 释放锁", Thread.currentThread().getName());
                        // await的时候释放锁, 醒来的时候则去争抢锁
                        condition.await();
                        LOGGER.info("{} 醒来, 获取到锁, 执行任务", Thread.currentThread().getName());
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        // do nothing
                    } finally {
                        LOGGER.info("{} 执行任务完毕, 释放锁", Thread.currentThread().getName());
                        lock.unlock();
                    }
                }
            });
        }
        Thread.sleep(200L);

        lock.lock();
        try {
            condition.signalAll();
        } finally {
            lock.unlock();
        }
        Thread.sleep(2000L);
    }

以上是关于JUC提供的几种线程之间协作的工具类的主要内容,如果未能解决你的问题,请参考以下文章

JUC下的几种线程计数器以及读写锁

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(关闭线程池: shutdownshutdownNow)

003-多线程-JUC线程池-几种特殊的ThreadPoolExecutornewFixedThreadPoolnewCachedThreadPoolnewSingleThreadExecuto

java高并发系列 - 第15天:JUC中的Semaphore,最简单的限流工具类,必备技能

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- Executors 类创建线程池(创建ThreadPoolExecutor对象)