Java的并发神器concurrent包详解
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java的并发神器concurrent包详解相关的知识,希望对你有一定的参考价值。
在JDK 1.5之前,提到并发,java程序员们一般想到的是wait()、notify()、Synchronized关键字等,但是并发除了要考虑竞态资源、死锁、资源公平性等问题,往往还需要考虑性能问题,在一些业务场景往往还会比较复杂,这些都给java coder们造成不小的难题。JDK 1.5的concurrent包帮我们解决了不少问题。
Concurrent包中包含了几个比较常用的并发模块,这个系列,LZ就和大家一起来学习各个模块,Let’s Go!
一、线程池的基本用法
一般并发包里有三个常用的线程池实例化方法,在Executors这个工厂类中。
- ·newFixedThreadPool(int size):
创建一个可重用固定线程数的线程池,以共享的无界队列(LinkedBlockingQueue)方式来运行这些线程。任何时刻最多只有size大小的线程在执行各自任务。当线程池中所有线程都在运行,后续新来的线程将在无界队列中等待,直到有空闲线程为止。如果有线程在执行任务期间终止退出,一个新的线程会替代原先的线程继续后面任务的执行。线程池中的线程会一直存在,除非显示的关闭某个线程。
ExecuterService fixedPool = Executors. newFixedThreadPool(5); //创建一个固定包含5个线程的线程池
- ·newCachedThreadPool()
创建一个可以根据需要创建线程的线程池,当一个任务过来,如果线程池中有空闲可用线程,先用空闲线程,没有就创建一个新线程。这种线程池对于执行short-lived asynchronous tasks(短期异步任务)通常会提高性能。默认空闲60s的线程会从线程池中移除,如果长时间空闲的话,这个线程池是不会占用任何资源的。但是以我个人的经验,这种线程池要慎用,用不好经常会造成堆外内存溢出,从而造成应用反应慢,最后导致宕机。
ExecuterService fixedPool = Executors. newCachedThreadPool();
- ·newSingleThreadExecutor()
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。可以保证顺序执行各个任务。有人说这不就是newFixedThreadPool(1)么,错矣。看源码就知道外部还包了一层FinalizableDelegatedExecutorService,这个是Executors的一个静态内部类,这个内部类继承了DelegatedExecutorService,返回的这个ExecutorService实现只包含ExecutorService接口定义的一些方法,不同于newFixedThreadPool有自己扩展的方法。源码注释的区别是:newSingleThreadExecutor可以保证使用其他线程时,返回的Executor不用重新配置。
ExecuterService fixedPool = Executors. newSingleThreadExecutor();
- ·newScheduledThreadPool(int size)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。这是个无界大小的线程池。其中主要的执行方法是scheduleAtFixedRate和scheduleWithFixedDelay方法,最终的核心方法是delayedExecute,这两个方法有什么区别呢,简单来说,scheduleAtFixedRate是以任务执行时间开始为延迟时间起点的,scheduleWithFixedDelay是以任务执行结束时间为延时时间起点的。
接下来对并发包里面常用的一些类进行使用解析
1) 信号量 Semaphore:
一个计数信号量,维护了一个许可集。通俗的讲,就是对有限共享资源记录使用情况。
举个日常生活中的例子好了:有20个人去登记信息表格,但是只有3只笔,一个人用完之后给下面一个人用,直到所有人都用完。
没有并发包之前,我们是怎么设计的呢?
定义一个变量a,用synchronized修饰,值为3,新建20个线程模拟20个人访问,每个人用笔之前,先看是否有空余的笔,如果有,就使用,并将变量a减少1,使用完成之后,再放回去,即将变量a加1,各个线程之间对于变量a的访问需要互斥进行。Synchronized是一个重量级的同步操作,涉及到用户态与核心态的切换,所以性能一般。
让我们再看看信号量是怎么实现的,Semaphore实现的思想跟上面差不多。只不过Semaphore已经帮我们做了同步处理,有两个关键的方法:
- ·acquire() 获取一个信号量许可
- ·release() 释放一个信号量许可
此外还有其他一些方法:availablePermits() 返回信号量可用的许可数,talk is cheap show me the code:
public class SemaphoreTest extends Thread{
private String name;
private Semaphore sh;
public SemaphoreTest(String name,Semaphore sh){
this.name = name;
this.sh = sh;
}
public void run(){
if(sh.availablePermits()>0){
System.out.println("有笔");
}else{
System.out.println("笔没了,等等");
}
try {
sh.acquire();//信号量减1
System.out.println(this.name+"号在用笔");
Thread.sleep((long) (Math.random()*1000));
sh.release();
System.out.println(this.name+"号用完了");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
ExecutorService es = Executors.newCachedThreadPool();
Semaphore sh = new Semaphore(3);
for(int i=0;i<20;i++){
es.submit(new SemaphoreTest(i+"",sh));
}
es.shutdown();
}
}
上述代码直接运行即可。
这个信号量是如何同步资源的,就需要读读jdk相关源码了。核心类是AbstractQueuedSynchronizer,该类实现了一个FIFO的列表,列表中Node表示列表节点,该Node有prev、next、Thread等属性,表示前节点后节点以及线程等。并发包中每个需要实现个性化同步机制的都要扩展该类,以实现不同的同步功能。比如Semaphore、countDownLatch、CyclicBarrier等,具体原理要分析源码,篇幅较长,这里不做展开,有兴趣的可以研究一下,对volatile关键字、CAS等都会有更深的理解。
2) CountDownLatch
用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。
比较重要的2个方法:countDown()和await()
下面来个常见的例子:导游带了20人团到了饭店吃饭,要等到20个人到齐了,才开始吃饭,还要等到20人都吃完了,才可以继续下个景点。
来看看不用并发包你会怎么实现,首先会设置20个人到来以及吃完的标志,每个线程过来更改自己的标志,主程序for或者while不停的循环监听,制止全部到来以及吃完才继续。如果是100人1000人呢,未免臃肿。
CountDownLatch的实现:
public class CountDownLautchTest {
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(20);
final CountDownLatch endFlag = new CountDownLatch(20);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch comeFlag = new CountDownLatch(20);
for(int i=0;i<20;i++){
final int j = i + 1;
Runnable person = new Runnable(){
@Override
public void run() {
System.out.println(j+"号游客来了");
comeFlag.countDown();
try {
startFlag.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
System.out.println(j+"号吃完了");
endFlag.countDown();
}
}
};
es.submit(person);
}
try {
comeFlag.await(); //保证所有人都到齐了
System.out.println("人都齐了,大家一起吃饭");
startFlag.countDown();//开吃
endFlag.await();//等待所有人都吃完了
System.out.println("全都吃完了,继续下个景点");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
es.shutdown();
}
}
CountDownLatch是一种不可逆的降序计数操作,所以上述代码里面定义了2个分别表示等候以及吃饭的标志。如果需要重复计数,就需要用到下面这个类:CyclicBarrier。
3) CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
还是上述的例子吧,再加一点场景,吃完饭还要游玩,游玩结束返程回家。让我们来看看CyclicBarrier是如何实现的,上代码:
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(20);
final ExecutorService es = Executors.newFixedThreadPool(20);
System.out.println("****等人齐****");
for(int i=0;i<20;i++){
final int j = i+1;
Runnable person = new Runnable(){
@Override
public void run() {
try {
System.out.println(j+"号来了");
if(barrier.await()==0){
System.out.println("****人都到齐了****");
}
System.out.println(j+"号开吃");
if(barrier.await()==0){
System.out.println("****吃完了出发****");
}
System.out.println(j+"号玩好了");
if(barrier.await()==0){
System.out.println("****游玩结束回家****");
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
es.submit(person);
}
es.shutdown();
}
}
代码直接运行即可,应该比较好懂吧,这里有个小窍门,代码中if(barrier.await()==0)这个条件,有点类似于CountDownLatch的await往下执行的条件了,每次判断都会减1,减到0,即完成所有线程的等待,继续下面的操作。
结语
关于并发包一些常用的类就介绍到这里吧,工作中具体怎么使用要具体分析各个业务场景,选择合适的方法,随机应变,触类旁通吧,后续会介绍一下Lock源码以及实现,涉及AQS的原理等。
未完,待续……
以上是关于Java的并发神器concurrent包详解的主要内容,如果未能解决你的问题,请参考以下文章