多线程必看之JAVA线程并发辅助类

Posted JAVA葵花宝典

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程必看之JAVA线程并发辅助类相关的知识,希望对你有一定的参考价值。

前言:多线程并发在我们的业务场景很常见,但是为了解决多线程的并发问题并不容易,即便你是中高级程序员亦或是架构师,你都不一定能够吃透并解决多线程并发场景所暴露的问题,今天抛砖引玉来给大家介绍下多程序并发的五种同步辅助类,希望在线程并发上能为大家以后提供一下思路以及解决问题。

1.CountDownLatch 

CountDownLatch 是一种非常简单、但很常用的同步辅助类。其作用是在完成一组正在其他线程中执行的操作之前,允许一个或多个线程一直阻塞。用给定的计数初始化CountDownLatch。由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞。之后,会释放所有等待的线程,await的所有后续调用都将立即返回

示例代码

  1. //创建时,就需要指定参与的parties个数  

  2. int parties = 12;  

  3. CountDownLatch latch = new CountDownLatch(parties);  

  4. //线程池中同步task  

  5. ExecutorService executor = Executors.newFixedThreadPool(parties);  

  6. for(int i = 0; i < parties; i++) {  

  7.     executor.execute(new Runnable() {  

  8.         @Override  

  9.         public void run() {  

  10.             try {  

  11.                 //可以在任务执行开始时执行,表示所有的任务都启动后,主线程的await即可解除  

  12.                 //latch.countDown();  

  13.                 //run  

  14.                 //..  

  15.                 Thread.sleep(3000);  

  16.   

  17.             } catch (Exception e) {  

  18.   

  19.             }  

  20.             finally {  

  21.                 //任务执行完毕后:到达  

  22.                 //表示所有的任务都结束,主线程才能继续  

  23.                 latch.countDown();  

  24.             }  

  25.         }  

  26.     });  

  27. }  

  28. latch.await();//主线程阻塞,直到所有的parties到达  

  29. //latch上所有的parties都达到后,再次执行await将不会有效,  

  30. //即barrier是不可重用的  

  31. executor.shutdown(); 


2.CyclicBarrier

CyclicBarrier 一种可重置的多路同步点,在某些并发编程场景很有用。它允许一组线程互相等待,直到到达某个公共的屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier在释放等待线程后可以重用,所以称它为循环的barrier。

下面看看对应的方法。

  • public CyclicBarrier(int parties, Runnable barrierAction) 
    创建一个新的CycleBarrier,它将在给定数量的参与者(线程)处于等待状态时候启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个今日的屏障的线程执行。
    参数barrierAction是在启动屏障的时候执行命令,如果不执行任何操作则该参数是null。

  • public int await() 
    在所有参与者都已经在此barrier上调用 await方法之前,将一直等待。如果当前线程部署将要到达的最后一个线程将禁用它。

  • reset 
    将屏障重置为其初始化状态,如果所有参与者目前都在屏障处等待,则它们将返回,而且会抛出一个异常。

  • getNumberWaiting 
    返回当前在屏障处等待的参与者数目。


3.Semaphore(信号量)

信号量是一类经典的同步工具。信号量通常用来限制线程可以同时访问的(物理或逻辑)资源数量。

我们以一个停车场运作为例来说明信号量的作用。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了三辆车,看门人允许其中它们进入进入,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。

在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用。信号量是一个非负整数,表示了当前公共资源的可用数目(在上面的例子中可以用空闲的停车位类比信号量),当一个线程要使用公共资源时(在上面的例子中可以用车辆类比线程),首先要查看信号量,如果信号量的值大于1,则将其减1,然后去占有公共资源。如果信号量的值为0,则线程会将自己阻塞,直到有其它线程释放公共资源。

在信号量上我们定义两种操作: acquire(获取) 和 release(释放)。当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

使用示例

papublic class SemaphoreDemo { 
       private Semaphore smp = new Semaphore(3); private Random rnd = new Random(); class TaskDemo implements Runnable{
       private String id; TaskDemo(String id){
                   this.id = id; }
                       public void run(){
                           try { smp.acquire(); System.out.println("Thread " + id + " is working"); Thread.sleep(rnd.nextInt(1000)); smp.release(); System.out.println("Thread " + id + " is over"); } catch (InterruptedException e) { } } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); //注意我创建的线程池类型, ExecutorService se = Executors.newCachedThreadPool(); se.submit(semaphoreDemo.new TaskDemo("a")); se.submit(semaphoreDemo.new TaskDemo("b")); se.submit(semaphoreDemo.new TaskDemo("c")); se.submit(semaphoreDemo.new TaskDemo("d")); se.submit(semaphoreDemo.new TaskDemo("e")); se.submit(semaphoreDemo.new TaskDemo("f")); se.shutdown(); }
}

运行结果

Thread c is working

Thread b is working

Thread a is working

Thread c is over

Thread d is working

Thread b is over

Thread e is working

Thread a is over

Thread f is working

Thread d is over

Thread e is over

Thread f is over

可以看出,最多同时有三个线程并发执行,也可以认为有三个公共资源(比如计算机的三个串口)。

参考内容:http://www.cnblogs.com/nullzx/p/5270233.html




4.Phaser

在JAVA 1.7引入了一个新的并发API:Phaser,一个可重用的同步barrier。在此前,JAVA已经有CyclicBarrier、CountDownLatch这两种同步barrier,但是Phaser更加灵活,而且侧重于“重用”。

API简述

     1、Phaser():构造函数,创建一个Phaser;默认parties个数为0。此后我们可以通过register()、bulkRegister()方法来注册新的parties。每个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了所有的waiter,即因为advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每个QNode保存一个waiter的信息,比如Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只需要响应另一个Queue中的waiters即可,避免出现混乱。

 

    2、Phaser(int parties):构造函数,初始一定数量的parties;相当于直接regsiter此数量的parties。

    3、arrive():到达,阻塞,等到当前phase下其他parties到达。如果没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,如果Phaser已经终止,则返回负数。

    4、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会导致Phaser内部的parties个数减一(只影响当前phase),即下一个phase需要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。

    5、arriveAndAwaitAdvance():到达,且阻塞直到其他parties都到达,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞机制支持timeout、interrupted响应,可以使用类似的其他方法(参见下文)。如果你希望到达后且注销,而且阻塞等到当前phase下其他的parties到达,可以使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。

    6、awaitAdvance(int phase):阻塞方法,等待phase周期数下其他所有的parties都到达。如果指定的phase与Phaser当前的phase不一致,则立即返回。

    7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程如果被外部中断,则此方法立即返回,并抛出InterrutedException。

    8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。

    9、forceTermination():强制终止,此后Phaser对象将不可用,即register等将不再有效。此方法将会导致Queue中所有的waiter线程被唤醒。

    10、register():新注册一个party,导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,如果Phaser已经中断,将会返回负数。

    11、bulkRegister(int parties):批量注册多个parties数组,规则同10、。

    12、getArrivedParties():获取已经到达的parties个数。

    13、getPhase():获取当前phase周期数。如果Phaser已经中断,则返回负值。

    14、getRegisteredParties():获取已经注册的parties个数。

    15、getUnarrivedParties():获取尚未到达的parties个数。

代码示例:

  1. //创建时,就需要指定参与的parties个数  

  2. int parties = 12;  

  3. //可以在创建时不指定parties  

  4. // 而是在运行时,随时注册和注销新的parties  

  5. Phaser phaser = new Phaser();  

  6. //主线程先注册一个  

  7. //对应下文中,主线程可以等待所有的parties到达后再解除阻塞(类似与CountDownLatch)  

  8. phaser.register();  

  9. ExecutorService executor = Executors.newFixedThreadPool(parties);  

  10. for(int i = 0; i < parties; i++) {  

  11.     phaser.register();//每创建一个task,我们就注册一个party  

  12.     executor.execute(new Runnable() {  

  13.         @Override  

  14.         public void run() {  

  15.             try {  

  16.                 int i = 0;  

  17.                 while (i < 3 && !phaser.isTerminated()) {  

  18.                     System.out.println("Generation:" + phaser.getPhase());  

  19.                     Thread.sleep(3000);  

  20.                     //等待同一周期内,其他Task到达  

  21.                     //然后进入新的周期,并继续同步进行  

  22.                     phaser.arriveAndAwaitAdvance();  

  23.                     i++;//我们假定,运行三个周期即可  

  24.                 }  

  25.             } catch (Exception e) {  

  26.   

  27.             }  

  28.             finally {  

  29.                 phaser.arriveAndDeregister();  

  30.             }  

  31.         }  

  32.     });  

  33. }  

  34. //主线程到达,且注销自己  

  35. //此后线程池中的线程即可开始按照周期,同步执行。  

  36. phaser.arriveAndDeregister();  

参考内容:http://shift-alt-ctrl.iteye.com/blog/2302923


5.Exchanger

类java.util.concurrent.Exchanger提供了一个同步点,在这个同步点,一对线程可以交换数据。每个线程通过exchange()方法的入口提供数据给他的伙伴线程,并接收他的伙伴线程提供的数据,并返回。

当在运行不对称的活动时很有用。比如说,一个线程向buffer中填充数据,另一个线程从buffer中消费数据;这些线程可以用Exchange来交换数据。这个交换对于两个线程来说都是安全的。


package com.clzhang.sample.thread;import java.util.*;import java.util.concurrent.Exchanger;public class SyncExchanger {    

private static final Exchanger exchanger = new Exchanger();    

class DataProducer implements Runnable {        

private List list = new ArrayList();      public void run() {          
         
for (int i = 0; i < 5; i++) {                System.out.println("生产了一个数据,耗时1秒");                list.add(new Date());                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            try {                list = (List) exchanger.exchange(list);            } catch (InterruptedException e) {                e.printStackTrace();            }          
         
for (Iterator iterator = list.iterator(); iterator.hasNext();) {                System.out.println("Producer " + iterator.next());            }        }    }    

class DataConsumer implements Runnable {        

           private List list = new ArrayList();
           public void run() {            
           
for (int i = 0; i < 5; i++) {                list.add("这是一个收条。");            }            

           try {                list = (List) exchanger.exchange(list);            } catch (InterruptedException e) {                e.printStackTrace();            }            

           for (Iterator iterator = list.iterator(); iterator.hasNext();) {                Date d = (Date) iterator.next();                System.out.println("Consumer: " + d);            }        }    }    

       public static void main(String[] args) {         SyncExchanger ins = new SyncExchanger();        
       
new Thread(ins.new DataProducer()).start();        
       
new Thread(ins.new DataConsumer()).start();    } }

输出
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
生产了一个数据,耗时1秒
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Producer 这是一个收条。
Consumer: Thu Sep 12 17:21:39 CST 2013
Consumer: Thu Sep 12 17:21:40 CST 2013
Consumer: Thu Sep 12 17:21:41 CST 2013
Consumer: Thu Sep 12 17:21:42 CST 2013
Consumer: Thu Sep 12 17:21:43 CST 2013


今日励志鸡汤:只有不停的深入,你才会觉得快乐(我指的是学习)

(偷偷告诉你,输入技术,你会有意想不到的收获哟~)


看完本文有收获?请转发分享给更多人,转载只需三秒

如果你有好的建议,请底下留言,我们共同成长

关注 励志程序员 ,编程更有趣







以上是关于多线程必看之JAVA线程并发辅助类的主要内容,如果未能解决你的问题,请参考以下文章

Java进阶之光!2021必看-Java高级面试题总结

Java——多线程高并发系列之JUC三大辅助类(CountDownLatchCyclicBarrierSemaphore)

Java——多线程高并发系列之JUC三大辅助类(CountDownLatchCyclicBarrierSemaphore)

Java多线程与并发库高级应用-工具类介绍

Java多线程与并发库高级应用-工具类介绍

java规范化快捷键,Java开发者必看