Java中的并发工具类

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的并发工具类相关的知识,希望对你有一定的参考价值。

CountDownLatch  

  CountDownLatch允许一个或多个线程等待其他线程完成操作

  当需要解析一个Excel里面有多个sheet数据时,可以使用多线程,每个线程解析一个sheet里的数据。主线程等待所有线程执行完sheet的解析操作。

  

public class JoinCountDownLatchTest(){

  public static void main(String[] args){
    Thread parser1 = new Thread(new Runnable)_{
      public void run(){
      }
    });
    Thread parser2 = new Thread(new Runnable)_{
      public void run(){
      }
    });
    parser1.start();
    parser2.start();
    parser1.join();
    parser1.join();
    System.out.print("all parsers finished");
  }

}

  join用于让当前执行线程等待join线程执行结束。其原理是不停检查join线程是否存活,若join线程存活则让当前线程永远等待。直到join()线程中之后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是JVM里实现的。

public class CountDownLatchTest{
  static CountDownLatch c = new CountDownLatch(2);
  public static void main(String[] args) throws InterruptedException{
    new Thread(new Runnable(){
      System.out.println(1);
      c.countDown();
      System.out.println(2);
      c.countDown();
    }).start();
    c.wait();
    System.out.println("3");
  }
}

  CountDownLatch的构造函数接受一个int类型的参数作为计数器,若想等待N个点完成,就传入N。当我们调用CountDownLatch的countDown方法时,N就减1,CountDownLatch的await方法会阻塞当前线程,直到N变成0。由于countDown方法可以用在任何定法,即可以使N个线程。只需要把这个CountDownLatch的引用传递到线程里即可。若某个线程处理的比较慢,可以使用await的重载方法await(long time, TimeUnit unit)。

  计数器必须大于0,当计数器等于0时,调用await()方法不会阻塞当前线程。CountDownLatch不可能重新初始化或修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before另外一个线程调用await方法。

 

同步屏障CyclicBarrier

  CyclicBarrier的意思是可循环使用(Cyclic)的屏障(Barrier)。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。

  CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法高速CyclicBarrier我已经到达了屏障,然后当前贤臣被阻塞。

public class CyclicBarrireTest{
  static CyclicBarrier c = new CyclicBarrier(2);
  public static void main(String[] args){
    new Thread(new Runnable(){
      public void run(){
        try{
          c.await();
        }catch(Exception e){
        }
        System.out.println(1);
      }
    }).start();
    try{
      c.await();
    }catch(Exception e){
      System.out.println(2);
    }
  }
}

  若把new CyclicBarrier(2)修改成new CyclicBarrire(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程达到屏障,所以之前到达屏障的两个线程都不会继续执行。

  CyclicBarrier还提供给了构造函数CyclicBarrier(int parties, Runnable barrierAction)用于在线程到达屏障时先执行barrierAction。

public class CyclicBarrierTest2{
  static CyclicBarrier c = new CyclicBarrier(2, new A());
  public static void main(String[] args){
    new Thread(new Runnable(){
      public void run(){
        try{
          c.await(); 
        }catch(Exception e){}
        System.out.println(1);
      }
    }).start();
    try{ 
      c.await();
    }catch(Exception e){}
    System.out.println(2);
  }

  static class A implements Runnable{
    public void run(){
      System.out.println(3);
    }
  }
}

 

  CyclicBarrier可用于多线程计算数据最后合并计算结果的场景。

public class BankWaterService implements Runnable{
  private CyclicBarrier c = new CyclicBarrier(4, this);
  private Executor executor = Executors.newFixedThreadPool(4, this);
  private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
  
  private void count(){
    for(int i = 0; i < 4; i++){
      executor.execute(new Runnable(){
        public void run(){
          sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
          try{
           c.await(); 
          }catch(InterruptedException | BrokenBarrierException e){
            e.printStackTrace();
          }
        }
      });
    }
  }

  public void run(){
    int result = 0;
    for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
      result += sheet.getValue();
    }
    sheetBankWaterCount.put("result". result);
    System.out.println(result);
  }

  public static void main(String[] args){
    BankWaterService bankWaterCount = new BankWaterService();
    bankWaterCount.count();
  }
  
}
public class BankWaterService implements Runnable{
  private CyclicBarrier c = new CyclicBarrier(4, this);
  private Executor executor = Executors.newFixedThreadPool(4, this);
  private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
  
  private void count(){
    for(int i = 0; i < 4; i++){
      executor.execute(new Runnable(){
        public void run(){
          sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
          try{
           c.await(); 
          }catch(InterruptedException | BrokenBarrierException e){
            e.printStackTrace();
          }
        }
      });
    }
  }

  public void run(){
    int result = 0;
    for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){
      result += sheet.getValue();
    }
    sheetBankWaterCount.put("result". result);
    System.out.println(result);
  }

  public static void main(String[] args){
    BankWaterService bankWaterCount = new BankWaterService();
    bankWaterCount.count();
  }
  
}

 

CyclicBarrier和CountDownLatch的区别

  CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier还提供了getNumberWaiting方法获取CyclicBarrier阻塞的线程数量。isBroker()方法用来了解阻塞的线程是否被中断。

 

Semaphore

  Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

  Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,如连接数据库。

public class SemaphoreTest(){
  private static final int THREAD_COUNT = 30;
  private static final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
  private static Semaphore s = new Semaphore(10);
  public static void main(String[] args){
    for(int i = 0; i < THREAD_COUNT; i++){
      threadPool.executor(new Runnable(){
        try{
         s.acquire();
         System.out.println("save data");
         s.release();
        }catch(InterruptedException e){}
      });
    }
    threadPool.shutdown();
  }
}

  Semaphore提供如下方法:

    acquire():获取一个许可证

    release():归还许可证

    int avaliablePermits():返回信号量中当前可用的许可证书

    int getQueueLength():返回正在等待获取许可证的线程书

              boolean hasQueuedThreads():是否有线程正在等待获取许可证

    void reducePermits(int reduction):减少reduction个许可证,protected

    Collection getQueuedThreads():返回所有等待获取许可证的线程集合,protected

 

Exchanger

  Exchanger是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,若第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程可以交换数据,将本线程生产出的数据传递给对方。

  Exchange可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候交换两个人数据,并使用交叉规则得出2个交配的结果。同时Exchanger也可以用于校对工作。为了避免错误,采用AB岗两人进行录入数据到Excel后,系统加载两个Excel并对两个Excel数据进行校对。

public class ExchangerTest{
  private static final Exchanger<String> exgr = new Exchanger<String>();
  private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
  
  public static void main(String[] args){
    threadPool.execute(new Runnble(){
      public void run(){
        try{
          String A = "banker A";
          exgr.exchange(A);
        }catch(InterruptedException){}
      }
    });
    threadPool.execute(new Runnable(){
      public void run(){
        String B = "banker B";
        String A = exgr.exchange(B);
        System.out.println("A equals to B : " + A.equals(B));
      }
    });
  threadPool.shutdown(); 
  }
}

  若两个线程有一个没有执行exchange()方法,则会一直等待。此时为避免一直等候,可以调用exchange(V x, long timeout, TimeUnit unit)来设置最大等待时长。

以上是关于Java中的并发工具类的主要内容,如果未能解决你的问题,请参考以下文章

elasticsearch代码片段,及工具类SearchEsUtil.java

Java并发编程-CountDownLatch

Java并发编程-CountDownLatch

solr分布式索引实战分片配置读取:工具类configUtil.java,读取配置代码片段,配置实例

第八章 Java中的并发工具类

Java中的并发工具类