java并发系列-----线程之间的协作(waitnotifyjoinCountDownLatchCyclicBarrier)

Posted alimayun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发系列-----线程之间的协作(waitnotifyjoinCountDownLatchCyclicBarrier)相关的知识,希望对你有一定的参考价值。

在java中,线程之间的切换是由操作系统说了算的,操作系统会给每个线程分配一个时间片,在时间片到期之后,线程让出cpu资源,由其他线程一起抢夺,那么如果开发想自己去在一定程度上(因为没办法100%控制它)让线程之间互相协作、通信,有哪些方式呢?

 

wait、notify、notifyAll

1、void wait( ) 
导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法
2、void wait(long timeout) 
导致当前的线程等待,直到其他线程调用此对象的notify() 方法或 notifyAll() 方法,或者指定的时间过完。
3、void notify() 
唤醒在此对象监视器上等待的单个线程
4、void notifyAll() 
唤醒在此对象监视器上等待的所有线程

举例说明:

 

public class WaitTest1 {

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start();

                System.out.println(Thread.currentThread().getName()+" block");
                ta.wait();    // 等待

                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”。这个this代表是内部类ThreadA的对象。
                System.out.println(Thread.currentThread().getName()+" wakup others");
                notify();    // 唤醒“当前对象上的等待线程”
            }
        }
    }
}

 

运行结果

main start ta
main block
ta wakup others
main continue

 

注:notify()与notifyAll()区别:notify()会使等待获取某对象锁的一个线程到Runnable状态,但是notifyAll()会使所有的线程到Runnable状态,虽然notifyAll()方法性能开销略大,但是不存在信号丢失问题,因此优先推荐使用notifyAll()。 

 

Thread.join()

1、作用

当一个线程希望等待另外一个或多个线程运行结束时可以使用。

 

2、方法签名

public final void join()throws InterruptedException  
//等待多少秒
public void join(long millis)throwsInterruptedException  
//看了下源码,也不知道这方法是干啥的。。。
public final void join(long millis, int nanos)throws InterruptedException

 

3、示例

package com.ty.thread;

public class JoinDemo {

    class Boss implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程为:" + Thread.currentThread().getName() + "老板给工人发工资");
        }
        
    }
    
    static class Worker implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人准备干活喽*****");
            
            try {
                //模拟工人干活所花费的时间
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人干活结束*****");
        }
        
    }
    
    public static void main(String[] args) throws InterruptedException {
        Thread worker1 = new Thread(new Worker(), "工人1线程");
        Thread worker2 = new Thread(new Worker(), "工人2线程");
        Thread worker3 = new Thread(new Worker(), "工人3线程");
        //worker与boss一个为static,一个不为static,只是为了展示下不同的写法
        Thread boss = new Thread(new JoinDemo().new Boss(), "boss线程");
        worker1.start();
        worker2.start();
        worker3.start();
        //等待工人干完活,老板才能给钱啊,可恶的资本主义。。。
        worker1.join();
        worker2.join();
        worker3.join();
        boss.start();
    }
}

运行结果如下:

当前线程为:工人1线程工人准备干活喽*****
当前线程为:工人3线程工人准备干活喽*****
当前线程为:工人2线程工人准备干活喽*****
当前线程为:工人1线程工人干活结束*****
当前线程为:工人2线程工人干活结束*****
当前线程为:工人3线程工人干活结束*****
当前线程为:boss线程老板给工人发工资

反正boss线程肯定是等所有工人线程运行结束之后才开始运行

 

Condition

 

wait()/notify()存在的问题:
1、过于底层,并且不好控制
2、存在过早唤醒的情况
3、wait(long)无法区分是等待超时还是被通知线程唤醒

 

技术图片

过早唤醒:上图中Notify1线程调用了notifyAll()或notify(),唤醒了wait4线程,但是其实这时候wait4不满足唤醒条件,这种情况就叫做过早唤醒。可以这么理解,你在大街上看到一个美女,然后你大喊一声美女,N个女生回头,就是这个意思。

 

因此出现了Condition接口,它作为wait/notify的替代品,解决了过早唤醒的情况,并且解决了wait(long)不能区分其返回是否由等待超时而导致的问题。Condition中的await()、signal()以及signalAll()分别替代wait()、notify()以及notifyAll()。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

 

主要方法:

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

 

示例:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest1 {
        
    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        lock.lock(); // 获取锁
        try {
            System.out.println(Thread.currentThread().getName()+" start ta");
            ta.start();

            System.out.println(Thread.currentThread().getName()+" block");
            condition.await();    // 等待

            System.out.println(Thread.currentThread().getName()+" continue");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            lock.lock();    // 获取锁
            try {
                System.out.println(Thread.currentThread().getName()+" wakup others");
                condition.signal();    // 唤醒“condition所在锁上的其它线程”
            } finally {
                lock.unlock();    // 释放锁
            }
        }
    }
}

运行结果:

main start ta
main block
ta wakup others
main continue

那么Condition是如何预防过早唤醒的呢?如下图所示:

技术图片

其实本质就是不同的notify线程跟wait线程之间使用不同的condition,与wait/notify相比,粒度更细,因此预防过早唤醒的情况出现。

并且针对一个锁,可以有多个condition,示例如下:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
            while (count == items.length)
                notFull.await();
            // 将x添加到缓冲中
            items[putptr] = x; 
            // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            // 将“缓冲”数量+1
            ++count;
            // 唤醒take线程,因为take线程通过notEmpty.await()等待
            notEmpty.signal();

            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
            while (count == 0) 
                notEmpty.await();
            // 将x从缓冲中取出
            Object x = items[takeptr]; 
            // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
            if (++takeptr == items.length) takeptr = 0;
            // 将“缓冲”数量-1
            --count;
            // 唤醒put线程,因为put线程通过notFull.await()等待
            notFull.signal();

            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();    // 释放锁
        }
    } 
}

public class ConditionTest2 {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
        // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 线程休眠1ms
                bb.put(num);        // 向BoundedBuffer中写入数据
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 线程休眠1ms
                Integer num = (Integer)bb.take();    // 从BoundedBuffer中取出数据
            } catch (InterruptedException e) {
            }
        }
    }
}

运行结果如下;

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

先描述下这种场景,有一个缓冲区,另外有一个读线程,一个写线程。读线程与写线程互不干扰的工作,但是当写线程开始工作后,通知读线程去读,并且读线程开始读之后,通知写线程工作。这种场景用wait/notify机制根本无法实现,但是可以通过多个condition可以实现。也就是说比如A、B两个线程,A需要唤醒B,使用一个condition,但同时B也需要唤醒A,就得使用另一个condition。

 

CountDownLatch

countDownLatch与thread.join()功能类似,但是join的粒度太大,必须要等整个线程执行结束才能执行后续相关动作,但是countDownLatch可以实现更精细的粒度。

1、原理

其实这种方式也就是所谓的闭锁,一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。内部维护一个用于表示未完成的先决操作数量的计数器,通过countDown()方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。

 

2、主要方法

public CountDownLatch(int count); //指定计数的次数,只能被设置1次
public void countDown();          //调用此方法则计数减1
public void await() throws InterruptedException   //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
public Long getCount();           //得到当前的计数
public boolean await(long timeout, TimeUnit unit) //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时。

 

3、示例

public class CountDownLatchDemo {  
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
    public static void main(String[] args) throws InterruptedException {  
        CountDownLatch latch=new CountDownLatch(2);//两个工人的协作  
        Worker worker1=new Worker("zhang san", 5000, latch);  
        Worker worker2=new Worker("li si", 8000, latch);  
        worker1.start();//  
        worker2.start();//  
        latch.await();//等待所有工人完成工作  
        System.out.println("all work done at "+sdf.format(new Date()));  
    }  
      
      
    static class Worker extends Thread{  
        String workerName;   
        int workTime;  
        CountDownLatch latch;  
        public Worker(String workerName ,int workTime ,CountDownLatch latch){  
             this.workerName=workerName;  
             this.workTime=workTime;  
             this.latch=latch;  
        }  
        public void run(){  
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
            doWork();//工作了  
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
            latch.countDown();//工人完成工作,计数器减一  
  
        }  
          
        private void doWork(){  
            try {  
                Thread.sleep(workTime);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

 

CyclicBarrier(栅栏)

1、概念

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

2、构造器

public CyclicBarrier(int parties, Runnable barrierAction) {
}
 
public CyclicBarrier(int parties) {
}

3、主要方法

public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

4、示例

场景:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

运行结果:

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

 

以上是关于java并发系列-----线程之间的协作(waitnotifyjoinCountDownLatchCyclicBarrier)的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程系列:线程的五大状态,以及线程之间的通信与协作

漫谈并发编程:线程之间的协作

Java 线程系列Java线程之间的共享和协作

java并发编程 线程间协作

Java并发之线程间协作Object的wait()notify()notifyAll()

Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)