Java中的并发工具类
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的并发工具类相关的知识,希望对你有一定的参考价值。
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作
当需要解析一个Excel里面有多个sheet数据时,可以使用多线程,每个线程解析一个sheet里的数据。主线程等待所有线程执行完sheet的解析操作。
public class JoinCountDownLatchTest(){ public static void main(String[] args){ Thread parser1 = new Thread(new Runnable)_{ public void run(){ } }); Thread parser2 = new Thread(new Runnable)_{ public void run(){ } }); parser1.start(); parser2.start(); parser1.join(); parser1.join(); System.out.print("all parsers finished"); } }
join用于让当前执行线程等待join线程执行结束。其原理是不停检查join线程是否存活,若join线程存活则让当前线程永远等待。直到join()线程中之后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是JVM里实现的。
public class CountDownLatchTest{ static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException{ new Thread(new Runnable(){ System.out.println(1); c.countDown(); System.out.println(2); c.countDown(); }).start(); c.wait(); System.out.println("3"); } }
CountDownLatch的构造函数接受一个int类型的参数作为计数器,若想等待N个点完成,就传入N。当我们调用CountDownLatch的countDown方法时,N就减1,CountDownLatch的await方法会阻塞当前线程,直到N变成0。由于countDown方法可以用在任何定法,即可以使N个线程。只需要把这个CountDownLatch的引用传递到线程里即可。若某个线程处理的比较慢,可以使用await的重载方法await(long time, TimeUnit unit)。
计数器必须大于0,当计数器等于0时,调用await()方法不会阻塞当前线程。CountDownLatch不可能重新初始化或修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before另外一个线程调用await方法。
同步屏障CyclicBarrier
CyclicBarrier的意思是可循环使用(Cyclic)的屏障(Barrier)。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法高速CyclicBarrier我已经到达了屏障,然后当前贤臣被阻塞。
public class CyclicBarrireTest{ static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args){ new Thread(new Runnable(){ public void run(){ try{ c.await(); }catch(Exception e){ } System.out.println(1); } }).start(); try{ c.await(); }catch(Exception e){ System.out.println(2); } } }
若把new CyclicBarrier(2)修改成new CyclicBarrire(3),则主线程和子线程会永远等待,因为没有第三个线程执行await方法,即没有第三个线程达到屏障,所以之前到达屏障的两个线程都不会继续执行。
CyclicBarrier还提供给了构造函数CyclicBarrier(int parties, Runnable barrierAction)用于在线程到达屏障时先执行barrierAction。
public class CyclicBarrierTest2{ static CyclicBarrier c = new CyclicBarrier(2, new A()); public static void main(String[] args){ new Thread(new Runnable(){ public void run(){ try{ c.await(); }catch(Exception e){} System.out.println(1); } }).start(); try{ c.await(); }catch(Exception e){} System.out.println(2); } static class A implements Runnable{ public void run(){ System.out.println(3); } } }
CyclicBarrier可用于多线程计算数据最后合并计算结果的场景。
public class BankWaterService implements Runnable{ private CyclicBarrier c = new CyclicBarrier(4, this); private Executor executor = Executors.newFixedThreadPool(4, this); private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); private void count(){ for(int i = 0; i < 4; i++){ executor.execute(new Runnable(){ public void run(){ sheetBankWaterCount.put(Thread.currentThread().getName(), 1); try{ c.await(); }catch(InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } }); } } public void run(){ int result = 0; for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){ result += sheet.getValue(); } sheetBankWaterCount.put("result". result); System.out.println(result); } public static void main(String[] args){ BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } }
public class BankWaterService implements Runnable{ private CyclicBarrier c = new CyclicBarrier(4, this); private Executor executor = Executors.newFixedThreadPool(4, this); private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>(); private void count(){ for(int i = 0; i < 4; i++){ executor.execute(new Runnable(){ public void run(){ sheetBankWaterCount.put(Thread.currentThread().getName(), 1); try{ c.await(); }catch(InterruptedException | BrokenBarrierException e){ e.printStackTrace(); } } }); } } public void run(){ int result = 0; for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()){ result += sheet.getValue(); } sheetBankWaterCount.put("result". result); System.out.println(result); } public static void main(String[] args){ BankWaterService bankWaterCount = new BankWaterService(); bankWaterCount.count(); } }
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。CyclicBarrier还提供了getNumberWaiting方法获取CyclicBarrier阻塞的线程数量。isBroker()方法用来了解阻塞的线程是否被中断。
Semaphore
Semaphore是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,如连接数据库。
public class SemaphoreTest(){ private static final int THREAD_COUNT = 30; private static final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args){ for(int i = 0; i < THREAD_COUNT; i++){ threadPool.executor(new Runnable(){ try{ s.acquire(); System.out.println("save data"); s.release(); }catch(InterruptedException e){} }); } threadPool.shutdown(); } }
Semaphore提供如下方法:
acquire():获取一个许可证
release():归还许可证
int avaliablePermits():返回信号量中当前可用的许可证书
int getQueueLength():返回正在等待获取许可证的线程书
boolean hasQueuedThreads():是否有线程正在等待获取许可证
void reducePermits(int reduction):减少reduction个许可证,protected
Collection getQueuedThreads():返回所有等待获取许可证的线程集合,protected
Exchanger
Exchanger是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,若第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange()方法,当两个线程都到达同步点时,这两个线程可以交换数据,将本线程生产出的数据传递给对方。
Exchange可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候交换两个人数据,并使用交叉规则得出2个交配的结果。同时Exchanger也可以用于校对工作。为了避免错误,采用AB岗两人进行录入数据到Excel后,系统加载两个Excel并对两个Excel数据进行校对。
public class ExchangerTest{ private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args){ threadPool.execute(new Runnble(){ public void run(){ try{ String A = "banker A"; exgr.exchange(A); }catch(InterruptedException){} } }); threadPool.execute(new Runnable(){ public void run(){ String B = "banker B"; String A = exgr.exchange(B); System.out.println("A equals to B : " + A.equals(B)); } }); threadPool.shutdown(); } }
若两个线程有一个没有执行exchange()方法,则会一直等待。此时为避免一直等候,可以调用exchange(V x, long timeout, TimeUnit unit)来设置最大等待时长。
以上是关于Java中的并发工具类的主要内容,如果未能解决你的问题,请参考以下文章
elasticsearch代码片段,及工具类SearchEsUtil.java