线程间配合:ConditionSemaphoreCountDownLatchCyclicBarrier

Posted muuu520

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程间配合:ConditionSemaphoreCountDownLatchCyclicBarrier相关的知识,希望对你有一定的参考价值。

1 重入锁的好搭档:Condition条件

如果大家理解了Object.wait()Object.notify()方法的话,那么就能很容易理解Condition对象了。它和wait()notify()方法的作用大致是相同的。我们通过重入锁Lock接口的new Condition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一特定时刻得到通知,继续执行。
Condition接口的基本方法如下:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

以上方法的含义如下:

  • await():该方法会使当前线程在Condition队列(等待池)中等待,同时释放当前锁。当其他线程中使用signal()或者signalAll()方法时,线程会被唤醒并且加入到锁池中,准备竞争锁;或者当线程被中断时,也能跳出等待。
  • awaitUninterruptibly(),该方法与await()基本相同,只不过不会在等待过程中响应中断。
  • signal()方法用于唤醒一个处于等待池的线程,而signalAll()会唤醒所有处于等待池的线程。

2 允许多个线程同时访问:信号量(Semaphore)

信号量为多线程写作提供了更为强大的控制方法。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程同时访问某一个资源。信号量主要提供了以下构造函数:

public Semaphore(int permits);
public Semaphore(int permits, boolean fair);

在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。信号量的主要逻辑方法有:

public void acquire();
public void acquireUninterruptibly();
public boolean tryAquire();
public boolean tryAquire(long timeout, TimeUnit unit);
public void release();

acquire()方法尝试获得一个准入的许可,若无法获得,则线程会等待,直到有线程释放许可或者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不相应中断。tryAcquire()尝试获得一个许可,如果成功则返回true,失败则返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。

3 倒计时器:CountDownLatch

CountDown在英文中意为倒计数,Latch为门闩的意思。在这里,门闩的含义是:把门锁起来,不让里面的线程跑出来,因此这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。
CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。请看下面的示例:

/**
 * @author mutianjie
 * @date 2020/5/15 10:22 上午
 * 对于火箭发射,为了保证万无一失,往往要进行各种检查,等所有设备检查完毕后才能点燃引擎。
 */
public class CountDownLatchDemo implements Runnable{
    static final CountDownLatch end = new CountDownLatch(20);
    static final CountDownLatchDemo demo = new CountDownLatchDemo();
    @Override
    public void run() {
        try {
            // 模拟检查任务
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println("Check Complete!");
            end.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            exec.submit(demo);
        }
        // 等待检查
        // 官方Javadoc:Causes the current thread to wait until the latch has counted down to zero,
        // unless the thread is interrupted.
        end.await();
        // 发射火箭
        System.out.println("Fire!");
        exec.shutdown();
    }
}

主线程执行end.await()等待,直到计数器减为0之后才继续执行。

4 循环栅栏:CyclicBarrier

CyclicBarrier是另外一种多线程并发控制工具。CyclicBarrier可以理解为循环栅栏。栅栏就是一种障碍物,比如,通常在私人宅邸的周围就可以围上一圈栅栏,阻止闲杂人等入内。这里当然是用来阻止线程继续执行,要求线程在栅栏处等待。而这里循环的意思,也就是说这个计数器可以循环使用。比如,我们假设将计数器设置为10,那么凑齐第一批10个线程后,计数器就会归零,然后接着凑齐下一批10个线程。
CyclicBarrier的构造函数除了接收计数总数parties,还可以接收一个参数Runnable barrierAction,这个参数表示当计数器完成一轮计数之后,系统会执行的动作。

public CyclicBarrier(int parties, Runnable barrierAction)

举个例子:

public class CyclicBarrierDemo {

    static class TaskThread extends Thread {
        
        CyclicBarrier barrier;
        
        public TaskThread(CyclicBarrier barrier) {
            this.barrier = barrier;
        }
        
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println(getName() + " 到达栅栏 A");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 A");
                
                Thread.sleep(2000);
                System.out.println(getName() + " 到达栅栏 B");
                barrier.await();
                System.out.println(getName() + " 冲破栅栏 B");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        int threadNum = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
            
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 完成最后任务");
            }
        });
        
        for(int i = 0; i < threadNum; i++) {
            new TaskThread(barrier).start();
        }
    }
    
}

打印结果:

Thread-1 到达栅栏 A
Thread-3 到达栅栏 A
Thread-0 到达栅栏 A
Thread-4 到达栅栏 A
Thread-2 到达栅栏 A
Thread-2 完成最后任务
Thread-2 冲破栅栏 A
Thread-1 冲破栅栏 A
Thread-3 冲破栅栏 A
Thread-4 冲破栅栏 A
Thread-0 冲破栅栏 A
Thread-4 到达栅栏 B
Thread-0 到达栅栏 B
Thread-3 到达栅栏 B
Thread-2 到达栅栏 B
Thread-1 到达栅栏 B
Thread-1 完成最后任务
Thread-1 冲破栅栏 B
Thread-0 冲破栅栏 B
Thread-4 冲破栅栏 B
Thread-2 冲破栅栏 B
Thread-3 冲破栅栏 B



以上是关于线程间配合:ConditionSemaphoreCountDownLatchCyclicBarrier的主要内容,如果未能解决你的问题,请参考以下文章

多线程线程间的通信和协作

线程间协作:waitnotifynotifyAll

线程间协作:waitnotifynotifyAll

waitnotifynotifyAll实现线程间通信

使用条件变量进行线程间的同步

线程汇总