Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger相关的知识,希望对你有一定的参考价值。

       CountDownLatch允许一个或者多个线程等待其他线程完成操作,之后再对结果做统一处理;

        适用场景,分布式系统中对多个微服务的调用,并发执行并且必须等待全部执行完成才能继续执行后续操作;



        其实在java中默认的实现是join()方法,join()方法主要的作用是当前线程必须等待直到join线程执行完成之后才能继续执行后续的操作,

        其本质就是轮询判断join线程是否存活,如果存活则主线程继续等待,否则,通过调用this.notifyAll()方法来继续执行主线程。

        实例代码如下:

public static void main(String[] args) throws InterruptedException {

        Thread thread1 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("this is thread 1");

        }

        });

        Thread thread2 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("Thread2 is finish");

        }

        });


        thread1.start();

        thread2.start();

        /*thread1.join();

        thread2.join();*/ (1)

        System.out.println("all parser finish");

        }

        现在的代码是注释掉了两个join()方法的调用,那么输出结果将不能被保证,三个sout的输出打印是乱序的。

        如果将上述的注释(1)去掉,则根据join()方法的定义,可以知main线程会先等待thread1的执行结束才会执行thread2的执行,直到thread2执行结束才会继续往下执行输出:

        "all parser finish";从而保证执行顺序固定,即线程thread1先执行,其次是thread2的执行,最后main线程执行最后的输出。


        那么同样的如果我们用CountDownLatch来实现,则应用代码如下:

static CountDownLatch c =  new CountDownLatch(2);//定义成员变量

public static void main(String[] args) throws InterruptedException{

        new Thread(new Runnable() {

@Override

public void run() {

        System.out.println(1);

        c.countDown();

        System.out.println(2);

        c.countDown();

        }

        }).start();

        c.await();

        System.out.println(3);

        }


        其中定义的CountDownLatch(2)表示等待两个点完成,即当c变成0以后当前线程才会继续执行后续代码,否则由于await()方法,线程会一直等待;

        而每次调用countDown()方法则c就会减一,上述代码在输出1,2之后,因为调用两次countDown之后c变成0,那么c.await()方法会失效,然后main()线程执行最后输出3;


        如果我们要等待的是多个线程的并发执行,则代码如下

static CountDownLatch c  = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {

        Thread thread1 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("this is thread 1");

        try {

        Thread.sleep(100l);

        } catch (InterruptedException e) {

        e.printStackTrace();

        }

        c.countDown();

        }

        });

        Thread thread2 = new Thread(new Runnable() {

@Override

public void run() {

        System.out.println("Thread2 is finish");

        c.countDown();

        }

        });


        thread1.start();

        thread2.start();

        c.await();

        System.out.println("all parser finish");

        }

        上述代码中我们可以保证的是main线程会等待thread1和thread2线程的执行完成,之后再执行最后的打印,但是不保证thead1和thread2执行的先后顺序即有可能thread1先执行,也有可能thread2先执行;在这一点上有别与join方法,join方法可以保证其是按照调用顺序来执行的。

        注意:在使用CountDownLatch()的过程中必须保证count次数大于0,因为只有count次数大于0才能保证await()方法调用的阻塞。

        等待多线程完成的CountDownLatch和join()方法的使用就到这里结束了。

        CyclicBarrier:同步屏障,作用是使得一组线程到达一个同步点时被阻塞,直到所有线程都到达屏障时,屏障才会消失,所有被拦截的线程才可以继续执行。

        CyclicBarrier的使用方式和CountDownLatch类似;实例代码如下:

static CyclicBarrier c = new CyclicBarrier(2);


public static void main(String[] args) throws InterruptedException {

        new Thread(new Runnable() {

@Override

public void run() {

        try {

        c.await();

        } catch (Exception e) {

        e.printStackTrace();

        }

        System.out.println(1);


        }

        }).start();

        try {

        c.await();

        } catch (Exception e) {

        e.printStackTrace();

        }

        System.out.println(2);


        }

        上述代码的执行结果可能是1,2也可能是2,1,c并没有保证线程的顺序,从目前来看,CyclicBarrier和CountDownLatch几乎实现的是一样的功能。

        但是CyclicBarrier有更强大的功能,即通过构造函数:new CyclicBarrier(int parties,Runnable barrierAction)来保证线程到达同步点的时候,优先执行barrierAction中的任务。实例代码如下:

static CyclicBarrier c = new CyclicBarrier(2, new PrThread());


public static void main(String[] args) {

        new Thread(new Runnable() {

@Override

public void run() {

        try {

        c.await();

        } catch (InterruptedException e) {

        e.printStackTrace();

        } catch (BrokenBarrierException e) {

        e.printStackTrace();

        }

        System.out.println(1);

        }

        }).start();

        try {

        c.await();

        } catch (InterruptedException e) {

        e.printStackTrace();

        } catch (BrokenBarrierException e) {

        e.printStackTrace();

        }

        System.out.println(2);

        }


static class PrThread implements Runnable {


    @Override

    public void run() {

        System.out.println(3);

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

其中输出顺序被保证为3,1,2,因为count设置为2,所以必须在第一个线程和线程PrThread执行完成之后才能执行主线程,完成输出。

        CyclicBarrier适用于多线程计算最后合并结果的场景。

        还有一点就是CountDownLatch()方法只能用一次,而CyclicBarrier可以通过reset()方法重复调用。至于其他方法比如getNumberWaiting可以获取CyclicBarrier阻塞的线程数等。

        对CyclicBarrier的使用到这就结束了。

        Semaphone:信号量是用来控制同时访问特定资源的线程数量,以保证合理使用有限的公共资源。常用场景是流量控制。实例代码如下:

private static final int THREAD_COUNT = 30;

private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

private static Semaphore s = new Semaphore(10);


public static void main(String[] args) {

        for (int i = 0; i <THREAD_COUNT ; i++) {

        threadPool.execute(new Runnable() {

@Override

public void run() {

        try {

        s.acquire();

        System.out.println("DO SOMETHING FOR YOURSELF"+s.getQueueLength());

        s.release();

        } catch (InterruptedException e) {

        e.printStackTrace();

        }

        }

        });




        }

        threadPool.shutdown();

        }

        ,其中构造线程池大小为30,而在同一时刻只允许10个线程执行输出;s.acquire()要求获取一个许可,而s.release()表示释放获取到的许可,当线程数超过可用的许可数,则进入等待状态,直到有许可可用,才会继续执行下一个任务。


        信号量的使用到这里就结束了,我们最后再说线程间的交换数据,其实就是线程之间的数据传递:

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();

    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);


    public static void main(String[] args) {

        threadPool.execute(new Runnable() {

            @Override

            public void run() {

                try {

                    String thread1 = "要交换的数据1";

                    exgr.exchange(thread1);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        });

        threadPool.execute(new Runnable() {

            @Override

            public void run() {

                try {

                    String thread2 = "要交换的数据1";

                    String exchange = exgr.exchange("thread2");

                    System.out.println(exchange.equals(thread2));

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }


            }

        });

        threadPool.shutdown();

    }


};

其中exchange为从thread1中拿到的要对比的数据,然后和thread2做对比,如果是相当则输出true.

        其中exgr携带了需要交互的数据信息。

        到此Exchanger的使用结束。

        并发编程中用的比较多的就是CountDownLatch和CyclicBarrier和Semaphore。所以了解这些有助于我们以后更好的编程


本文出自 “用行动,证明你” 博客,请务必保留此出处http://wuseeker.blog.51cto.com/5759064/1945512

以上是关于Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger的主要内容,如果未能解决你的问题,请参考以下文章

Java中用CyclicBarrier以及CountDownLatch和join相比有啥不同

java Fork/Join 池、ExecutorService 和 CountDownLatch

Join,CountDownLatch,CyclicBarrier,Semaphore和Exchanger

使用join和CountDownLatch来等待线程结束

多线程分配线程的实现方案:CountDownLatch类

CountDownLatch与join的区别和联系