java并发总结
Posted lovejune
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发总结相关的知识,希望对你有一定的参考价值。
并发是个系统的知识体系,有理论上的,有语言上的,有概念上的,这份总结力求简单,看的懂,而不是用一大堆源码和概念去描述
java的内存模型
java的内存模型被称为JMM,从上图可以看出,java内存模型主要是针对多线程。
为什么要先说java的内存模型,事实上,涉及到线程之间通信的两种模型;
第一种是消息传递,这种通信方式对程序员是不透明的,即程序员必须显示的用一个线程发消息,用另一个线程接收消息(例如RabbitMQ)。这种方式在线程上显示的规定了前后关系,发消息必须在接收消息之前。
第二种是共享内存,这种方式对程序员透明,通信过程是通过隐式进行。具体来说是,前一个线程将要传递的值存放在共享内存里面,而后一个线程会去取。这种方式如果是单线程模式,则不会出现问题,虽然JVM也会进行重排序优化,但不会影响运行结果,但对于多线程来说,即使不用重排序技术,线程之间通过共享内存通信也会出现很严重的同步问题。
其原因在于:
1.多线程之间并发进行,抢占CPU的资源,而不会同步进行(并发与同步相生相克,如果同步了,何必要并发),本来要发消息的线程在接收消息的线程之后执行,导致接收到的消息不准确。
2.java虚拟机会对程序进行重排序,导致线程运行的程序语句没有按照顺序执行,在多线程中,出现了很大的错误。
3.发消息的线程只把要发的数据存在本地内存中(逻辑概念,物理上不存在),而当收消息的线程执行时,还没有来得及将其刷新到共享内存中去(主内存)。
以上三个原因导致多线程使用共享内存模式出现了一系列的问题。
解决方法:同步。(使发消息的线程永远执行在收消息的线程之前)
同步问题需要解决很多问题,第一个就是可见性,一个线程对主内存数据如何对另一个线程可见,其中涉及到的就是顺序问题,JMM中提供了happens-before关系。向程序员保证,只要你编写的两个线程满足happens-before关系,那么可见性问题就会解决,即一个线程对主内存的修改对另一线程就是可见的。
具体的关系遵守的规则如下:
1.程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作。
解读:程序顺序规则是作用在一个线程之内,一个线程之内运行的多行程序,需要按照顺序来执行,A操作对内存的修改要想被B操作获取,必须A操作在B操作之前,即使重排序了,对于执行结果而言,A操作的结果也在B操作的结果之前。
2.监视器锁规则:对一个监视器的解锁,happens-before 于随后对这个监视器的加锁。
解读:这个很好理解,监视器就是一个管程,也就是一个锁,这个不管单线程和多线程,一个线程对一段代码加锁之后,能看到前一个线程解锁之前的操作结果。也就是解锁的过程就是将本地内存的值刷新到主内存的过程。
3.volatile变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。
解读:volatile规则就是要一个线程先去斜volatile,然后一个线程去读这个volatile变量,那么才能保证是可见的。
接下来看一道面试题
public class ThreadSafeCache { int result; public int getResult() { return result; } public synchronized void setResult(int result) { this.result = result; } public static void main(String[] args) { ThreadSafeCache threadSafeCache = new ThreadSafeCache(); for (int i = 0; i < 8; i++) { new Thread(() -> { int x = 0; while (threadSafeCache.getResult() < 100) { x++; } System.out.println(x); }).start(); } try { //主线程等待1s,等带其他线程执行 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } threadSafeCache.setResult(200); } }
问,当get方法不加锁,而set方法加锁,那么一个线程的set方法是否会对另一个线程的get方法可见呢,假设:线程执行的顺序没有问题。
事实上,如果是一个是只有两个线程,如果一个线程先set了,然后另一个线程get,除了这两个操作没有任何其他操作,那么这时候是可见的。这是因为,当执行set方法的时候,因为加了锁,会让set执行线程将本地内存中已经set的值刷新到主内存中去,而get方法在后面执行,因此一定会从主内存中取到这个已经修改的值,存到本地内存里面去。
但上面的程序为什么陷入了死循环呢,这是因为程序一执行,先让八个线程都执行了一遍get方法,而get方法会得到现在主线程中的值,存入本地内存中,因为get方法没有加锁,因此,在后面再执行get方法,它会一直在本地内存中去取值,因此,取到的都是原来的值,现在即使你利用主线程将set方法执行了,也不会得到set执行后主内存内的新值了。
修改,第一种方式,给get方法也加锁,根据happens-before(监视器锁规则)规则,当主线程将set方法执行后,子线程的get方法也能从主内存中获取到值(刷新),这样就可以退出死循环。
第二种,不加锁,给全局变量result加一个volatile(volatile规则),这样就会在执行时自动刷新,保证了可见性。
第三种,将全局变量定义为final,那么这样无论哪个线程都无法改变全局变量的值,一定保证可见,(但这有什么意义,根本和线程通信无关)
以上就是可见性问题的解决,但需要指出的是volatile只会保证全局数据的可见性,但同步关系,不能保证,而且在非原子操作中也会发生线程之间的安全问题。
上面的可见性,我们解决了共享内存的通信问题,但从通信中扩展,我要发送的一只一条数据,要好多条,那么问题来了,我怎么能确保我发送的数据,和你接收到的数据顺序一致呢。
如果只满足了上面的可见性,那么只能保证我发送一条数据,你肯定能接收到;但我如果发送多个数据,那么你接收的顺序和我发送的顺序就会不一致了。
比如上面的图,A操作为发送后的数据,和B接收的数据不一致。根本原因还是并发导致的,因此,要想得到相同的数据,需要将A要发送的操作和B接收的操作原子化
何谓原子化:
就是等A线程的四条操作都执行完以后,再进行发送,然后B的接收也是类似,当都接收完了,在进行汇总。
那么哪些操作是原子操作呢(即不需要原子化的操作):事实上,观察一个操作是不是原子操作,不能看其语言本身,而是要看他在内存中执行了几个指令,如果只执行一个指令,那么就是原子的,否则就不是;比如i++,其实在内存中执行了三个指令,不是原子操作。
所有不是原子操作的的操作都有线程安全问题。
1.volatile能保证原子操作吗?
显然不能,volatile只能保证可见性,不能保证原子性。(见上面分析)
针对i++的操作,计算机会执行三条指令来运行:
计算机将内存中的变量值读入cpu的寄存器--------(人看到-笔记本上的i 的值 ,把这个值写入 大脑的临时记忆区)
cpu 对其 + 1运算并运算后的值放入寄存器-------- (大脑对 这个 值进行加一运算,把这个值写入大脑的临时记忆区)
把这个值写入内存--------(把这个值写到笔记本上)
使用volatile指令,只能保证上面每条指令的对后面线程的可见性,但其实+1这条指令很可能执行的顺序有问题,导致没有执行+1操作。
2.加锁
对要做的操作加锁是可以保证操作的原子性的。为什么呢?
锁其实就是管程,保持线程同步。在执行+1操作之前,当前执行线程会获取到锁,获取到以后,其他线程暂时就不能执行这段加锁代码了,因此保证了+1操作的原子性。当线程执行完+1操作以后,会重新释放锁,让其他线程执行。
3.使用java自带的API
使用AtomicInteger中的方法可以保证原子性。(原理待解决)
从以上分析可以看出,解决原子性最好的方法还是加锁。
接下来,再聊聊线程间的通信,之前说过线程之间通信的两种方式,但其实,细分下来还包括很多种:
1.synchronized和volatile关键字,这两种上面已经分析清楚了,主要就是操作主内存。
2.等待/通知机制,这里主要用到wait和notify方法。
为什么这两个方法可以实现线程通信呢?
每一个对象都有一个与之对应的监视器 每一个监视器里面都有一个该对象的锁和一个等待队列和一个同步队列
wait语义:一是释放当前对象锁,另一个是进入阻塞队列
notify()语义:把阻塞队列中的线程放到同步队列中去
通过上面分析就很清楚了,wait可以使一个线程进入阻塞状态,并只能通过notify()来唤醒,重新拥有获得锁的能力。
那么通过这两个Object类中的方法,就能通过一个线程将另一个线程唤醒,进行了通信。
这种等待/通知机制能很好的解决生产者-消费者问题。(待解决)
3.Thread.join()方法
通过阅读源码,可以知道,
具体是这样的过程,A线程在B线程中执行(一般场景为B是主线程,A是创建线程),当执行到threadA.join()之后,join方法会调用wait方法,将B线程阻塞掉。然后去执行线程A的run方法,当执行完A线程,系统会调用exit方法(不需要显示调用),执行notifyAll将B线程唤醒。,B线程接着执行。
join(long)方法的使用原则是,如果在long的时间之内,执行A的线程,等时间过了,执行B的线程。
join(long)和sleep(long)的区别:sleep的时间是固定的,当没有过完这段时间,线程不会苏醒,但join(long)不同,如果A在long之前执行完,那么B会被唤醒执行。因为exit方法会直接调用notifyall方法。
4.管道流:消息机制(待解决)
参考:《java并发编程的艺术》
对于线程而言,每一线程都在执行任务,那么如何安全的取消一个任务呢?
1.手动设置一个中断标志
分析,任务运行在一个while循环当中,while的条件就是手动设置的标志位,将该标志位设置为volatile,那么当另一个线程显示的将该标志位修改的时候,那么当前线程就可以退出while循环,将任务取消了。
这么做是最简单的取消方式,但它存在一个缺陷,如果while循环里面出现了故障,导致在while内部出现了阻塞,那么while循环的标志位永远也没有机会去验证是否被修改了,因此也无法安全的取消了(其实如果做个判断还是可以退出循环的,但这样做显然不是个优雅的方式)
public class PrimeGenerator implements Runnable { private static ExecutorService exec = Executors.newCachedThreadPool(); private final List<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while (!cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<BigInteger>(primes); } static List<BigInteger> aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); exec.execute(generator); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); } }
观察上面代码,是一个运行一秒钟的素数生成器,使用的是使用一个标志位来实现生成素数任务的取消。
2.使用线程的中断
中断表示的是线程中的一个标志位,为true,则表示一个运行中的线程是否被其他线程进行了中断操作,(其他线程通过调用运行中线程的interrupt()方法)。
中断有三个方法:isInterrupted()方法可以获取当前线程的中断状态;interrupted()方法会将中断标志位复位;以及调用中断的interrupt()方法
事实上,很多api会判断线程是否被中断,比如阻塞库里面的Thread.sleep和Object.wait,Object.join等,他们响应中断的方法就是,清除中断状态,然后抛出中断异常。
使用中断来取消任务
正如任务中应该包含取消策略一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度采响应中断。
最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。
3.使用Future
给一个 Runnable r 和时间 long timeout,解决“最多花 timeout 分钟运行 Runnable,没运行完就取消”这种要求。
private static final ExecutorService cancelExec = Executors.newCachedThreadPool(); public static void timedRun(Runnable r, long timeout, TimeUnit unit) { Future<?> task = cancelExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // 如果超时,抛出超时异常 } catch (ExecutionException e) { // 如果任务运行出现了异常,抛出任务的异常 throw launderThrowable(e.getCause()); } finally { // 如果任务已经结束,这句没影响 // 如果任务还在运行,这句会中断任务 task.cancel(true); } }
(后面会详细解释)
安全取消一个任务是属于业务逻辑上面的东西,与并发并没有直接的关系。
线程安全问题
线程安全的实质就是解决好同步问题。
synchronized:阻塞同步
使用方法:1.修饰代码块,锁对象是指定的。2.修饰普通方法,锁对象是调用该方法的对象。3.修饰静态方法,锁对象是该方法所在的Class对象。
synchronized是一个互斥锁,在同一时刻,只允许一个线程拿到锁对象。是个可重入锁,拿到锁的对象可以再次拿到锁。
synchronized的原理
每个对象都可以扮演一个用于同步的锁的角色,这些内置的锁被称为内部锁,或监视器锁。
1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
3、如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
执行monitorexit的线程必须是object ref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
上面这三个条件已经把synchronized的原理说清楚了,一段代码,或者一个变量可能有很多线程来执行,那么为了同步,需要拿这段代码的唯一对象作为锁,线程们只有拿到这个锁,才能执行这段代码,没有拿到就不可以执行,当执行完以后把锁释放了,其他的线程也可以竞争了(刚释放了锁的线程也可以竞争)
synchronized锁是一个非公平锁,有些线程可能一直无法执行。(公平锁是按照线程进行同步队列的顺序来执行的,非公平锁是如果当前有竞争的锁就先执行竞争的锁)
锁膨胀:synchronized锁在一定情况下会升级
先来介绍锁对象的概念:锁是被加到对象上面去的,被加的对象成为锁对象,每一个对象都可以成为锁对象(看看Object对象提供的那些方法就懂了)
从java对象的存储结构说起,分三个部分:对象头、实例数据、填充数据
上面是一个对象头的信息,可以看到markwork里面就存放着锁信息。
LockObject lockObject = new LockObject();//随便创建一个对象 synchronized(lockObject){ //代码 }
当执行以上代码的时候,Markword的部分信息如下
这是因为线程执行到同步代码区(临界区)的时候,会利用CAS(Compare and Swap)操作,将线程ID插入markwork中,同时修改偏向锁和标志位。
(比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。)
当偏向锁状态为1的时候,说明对象的偏向锁生效了,同时也可以看到是哪个线程获取了该对象的锁,(bit fields中的threadid)
什么是偏向锁:这个锁会偏向于第一个获得它的线程,在接下来的执行过程中,假如该锁没有被其他线程所获取,没有其他线程来竞争该锁,那么持有偏向锁的线程将永远不需要进行同步操作。(本质上的意思是,如果获得这个偏向锁的线程在没有其他线程竞争的情况下,又一次进入了该快同步代码块,或退出同步代码块,那么不需要再去加锁和解锁的操作。)
步骤如下:
-
-
如果一致,则说明此线程已经成功获得了锁,继续执行下面的代码.
-
如果不一致,则要检查一下对象是否还是可偏向,即“是否偏向锁”标志位的值。
-
-
-
遍历线程栈,如果存在锁记录的话,需要修复锁记录和Markword,使其变成无锁状态。
-
唤醒当前线程,将当前锁升级成轻量级锁。
可以知道,其过程就是,先把拥有偏向锁的线程停止,然后修复锁记录和markword,使锁对象变成无锁状态,然后唤醒线程,然后把线程得到锁,并将锁升级为轻量级锁。
轻量级锁:
升级轻量级锁的过程
-
-
将锁对象的对象头中的MarkWord复制到线程的刚刚创建的锁记录中。
-
将锁记录中的Owner指针指向锁对象。
-
首先,栈帧是线程私有的,与线程的生命周期相同;在栈中记录自己获取锁的记录LockRecord,记录的内容就是锁对象的对象头中的MarkWord,然后记录中的Owner指针指向锁对象,并将锁对象的markword替换为指向锁记录的指针
轻量锁的标志是00
轻量级锁的分类:自旋锁、自适应自旋锁。
自旋锁:当有另一个线程来竞争锁时,这个线程不会进入阻塞状态,而是不断的在原地循环,直到当前线程释放锁资源之后,该线程马上能得到锁。
应用场景:自旋锁应用于那些同步代码执行很快的场景,这样竞争线程能很快得到锁,而不是一直运行cpu等待。
存在的问题:
轻量级锁是非阻塞锁(原理上就没有将线程阻塞)、乐观锁
轻量级锁的好处:线程的阻塞和开启都是需要开销的,要从用户态转换为内核态,因此,如果同步代码执行很快的情况下,使用轻量级锁是一种很好的选择。
重量级锁:依赖对象内部的monitor锁来实现的,而monitor又依赖操作系统的MutexLock(互斥锁)来实现的,这个时候,锁才用到了监视器锁,升级也会经过锁撤销和升级,标志位为10。
缺点:
synchronized锁的这种升级策略使得其应用变得更加广泛了。
Lock接口
拥有的方法:
lock
lockInterruptibly
tryLock
tryLock
unlock
newCondition
其中lock()方法是获取锁,unlock()方法是释放锁
实现的类:
ReadLock in ReentrantReadWriteLock (java.util.concurrent.locks)
ReadLockView in StampedLock (java.util.concurrent.locks)
ReentrantLock (java.util.concurrent.locks)
Segment in ConcurrentHashMap (java.util.concurrent)
WriteLock in ReentrantReadWriteLock (java.util.concurrent.locks)
WriteLockView in StampedLock (java.util.concurrent.locks)
需要使用的就是ReentrantLock类和 ReentrantReadWriteLock下面的两个静态类ReadLock、WriteLock
锁的实现原理:
Lock接口的实现大多数是通过聚合了一个同步器AbstractQueuedSynchronizer来完成线程访问的。
同步器使用一个int变量(用volatile修饰)的state变量来表示同步状态,通过内置的FOFO队列来完成资源获取的排队工作。Lock接口的实现类就是通过同步器来实现的。
同步器设计是通过模板方法模式,使用者继承同步器并重写指定方法。
使用三个方法来访问和修改同步状态:getState()(获取当前同步状态)、setState()(设置当前同步状态)、compareAndSetState()(使用CAS设置当前状态,保证状态的原子性)。
这样写还是很难理解,下面就介绍一个AQS的原理
1.AQS的核心思想:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
上面这段话很好理解,这就是锁机制的关键
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
通过上面的介绍,我们可以知道,每一个访问锁的线程被封装为一个Node,然后根据同步的状态来决定该Node如何排队,Node中的State是怎样的。
那么先介绍Node里面的属性和方法:
重要的就是WaitState、thread、prev、next(这几个方法和属性的含义一目了然,提供了封装的必要条件)
这两种模式在node里以静态final属性来实现。
witeState的一些枚举值,表示了状态的含义
以上就是Node的属性和方法的全部。
现在来看AQS里面的状态state,是一个int属性,并通过volatile来修饰。
这几个方法都是final修饰的,子类无法重写。
独占模式的流程。
从上图可以看出实现一个非公平锁的流程:首先通过CAS操作来修改状态,如果成功了就将该线程添加到setExclusiveOwnerThread里面
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
如果失败了,就调用acquire方法,acquire方法不可被子类重写,里面写了入队过程,通过addWaiter方法将节点放入队列的队尾,然后通过acquireQueued来循环获得状态
public final void acquire(int arg) { if (!tryAcquire(arg) && //tryAcquire是表示是否获取锁成功,如果成功就返回true,否则返回false,一般都是通过实现类来实现的
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;//将pred指向队尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//使用CAS操作将节点放到队尾 pred.next = node; return node; } } enq(node); return node; }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循环 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//查看当前节点的前驱是不是头结点 setHead(node);//如果是头结点, p.next = null; // help GC failed = false; return interrupted;//返回一个不中断的标志 } if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,中断,阻塞 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
以上就是抢占式锁的原理
Lock接口实现机制就到这里了,其实还是有些地方不清楚
下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:
// java.util.concurrent.locks.AbstractQueuedSynchronizer final boolean acquireQueued(final Node node, int arg) { // 标记是否成功拿到资源 boolean failed = true; try { // 标记等待过程中是否中断过 boolean interrupted = false; // 开始自旋,要么获取锁,要么中断 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) if (p == head && tryAcquire(arg)) { // 获取锁成功,头指针移动到当前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // java.util.concurrent.locks.AbstractQueuedSynchronizer // 靠前驱节点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取头结点的节点状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) return true; // 通过枚举值我们知道waitStatus>0是取消状态 if (ws > 0) { do { // 循环向前查找取消节点,把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
到此,我们就理解了整个Lock接口的机制
接下来,可以聊一聊CAS的机制,CAS是一种非阻塞同步机制,
非阻塞同步:CAS(Compare and Set)比较并设置
-
输入:
-
需要读写的内存位置 V
-
我们认为这个位置现在的值 A
-
想要写入的新值 B
-
-
输出: V 位置以前的值(无论写入操作是否成功)
-
含义: 我们认为 V 处的值应该是 A,如果是,把 V 处的值改为 B,如果不是则不修改,然后把 V 处现在的值返回给我。
CAS实现原子操作的三大问题
1、ABA 问题
问题描述: V 处的值经历了 A -> B -> A
的变化后,也认为是发生了变化的,而传统的 CAS 是无法发现这种变化的。
解决方法:
-
使用
AtomicStampedReference
的int stamp
版本号判断数据是否被修改过 -
使用
AtomicMarkableReference
的boolean marked
判断数据是否被修改过
2、循环时间长开销大。
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销
3、只能保证一个共享变量的原子操作。
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来
接下里,我们开始正式的使用Lock接口的实现类开始对代码加锁,解锁。
Lock lock = new ReentrantLock(); lock.lock(); try { // 同步代码块 } finally { lock.unlock(); // 千万不能忘记在finally块中释放锁 }
上面就是Lock使用案例
下面介绍三个高级的lock方法
trylock():轮询锁:不会阻塞的锁。
trylock(long):定时锁,是可中断锁,能抛出中断异常。
lockInterruptibly():中断锁,能在获得锁的同时,保持对中断的响应。
公平锁和非公平锁
-
-
非公平锁: 只当有线程持有锁时,新发出请求的线程才被放入队列中,如果新的线程到达时没有线程持有锁,但队列中有等待的线程(比如队列中的线程还在启动中,还没有拿到锁),这时新请求锁的线程会先于队列中的线程获取锁。
-
非公平锁性能更优的原因:
-
恢复一个被挂起的线程到这个线程真正运行起来之间,存在着巨大时时延
-
在等待队列中的线程被恢复的超长时延里,如果正好进来了一个线程获取锁,非公平锁会让这个新进来的线程先执行,它很有可以能等待队列中的线程恢复运行前就执行完了,相当于时间不变的情况下,利用等待线程的恢复运行时延,多执行了一个线程
-
-
读写锁
特点: 支持读操作并发执行,涉及到写操作时才线程间互斥执行。
方法:
-
获得读锁:
lock.readLock().lock()
-
释放读锁:
lock.readLock().unlock()
-
获得写锁:
lock.writeLock().lock()
-
释放写锁:
lock.writeLock().unlock()
Condition接口
任何一个java对象,都有一组监视器对象,包括wait()方法、wait(long)、notify()、以及notifyAll()方法。这些方法和synchronized关键字配合,可以实现等待/通知模式。
Condition接口也提供了类似的方法,与Lock接口配合可以实现等待/通知模式。
/* 获取Condition的方法 */ protected final Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); /* Condition接口中的方法 */ void await() throws InterruptedException; // 相当于wait() void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; // 相当于wait(long timeout) boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); // 相当于notify() void signalAll(); // 相当于notifyAll()
lock与synchronized 比较
锁的释放
synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定,但是使用Lock则不行,lock是通过代码实现的,要保证锁定一定会被释放,就必须将unLock()放到finally{}中
采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
超时锁
如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断,如果 使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情
读写锁
如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。
效率问题
当竞争不是很激烈的时候Synchronized使用的是轻量级锁或者偏向锁,这两种锁都能有效减少轮询或者阻塞的发生,相比与Lock仍旧要将未获得锁的线程放入等待队列阻塞带来的上下文切换的开销,
此时Synchronized效率会高些,当竞争激烈的时候Synchronized会升级为重量级锁,由于Synchronized的出对速度相比Lock要慢,所以Lock的效率会更高些。一般对于数据结构设计或者框架的设计都倾向于使用Lock而非Synchronized。
公平性
synchronized块不支持公平性,任何线程一旦释放就可以获得锁定,不能指定任何偏好。我们可以通过指定公平属性来实现Lock API中的公平性。它确保最长的等待线程可以访问锁
底层实现策略
syncronized:阻塞
互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因而这种同步又称为阻塞同步,它属于一种悲观的并发策略,即线程获得的是独占锁。独占锁意味着其他线程只能依靠阻塞来等待线程释放锁。而在CPU转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起CPU频繁的上下文切换导致效率很低。synchronized采用的便是这种并发策略。
?
lock:CAS
1.在乐观的并发策略中,需要操作和冲突检测这两个步骤具备原子性,它靠硬件指令来保证,这里用的是CAS操作(Compare and Swap)。JDK1.5之后,Java程序才可以使用CAS操作。我们可以进一步研究ReentrantLock的源代码,会发现其中比较重要的获得锁的一个方法是compareAndSetState,这里其实就是调用的CPU提供的特殊指令。现代的CPU提供了指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而compareAndSet() 就用这些代替了锁定。这个算法称作非阻塞算法,意思是一个线程的失败或者挂起不应该影响其他线程的失败或挂起。
2.随着指令集的发展,我们有了另一种选择:基于冲突检测的乐观并发策略,通俗地讲就是先进性操作,如果没有其他线程争用共享数据,那操作就成功了,如果共享数据被争用,产生了冲突,那就再进行其他的补偿措施(最常见的补偿措施就是不断地重拾,直到试成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步被称为非阻塞同步。ReetrantLock采用的便是这种并发策略。
synchronized 和 ReentrantLock 的选择
-
选择方式:
-
只有当我们需要如下高级功能时才使用 ReentrantLock,否则优先使用 synchronized
-
可轮询、可定时、可中断的锁
-
公平锁
-
非块结构的锁
-
-
-
优先选择 synchronized 的原因:
-
Java 6开始,ReenstrantLock 和内置锁的性能相差不大
-
synchronized 是 JVM 的内置属性,未来更有可能对 synchronized 进行性能优化,如对线程封闭的锁对象的锁消除,增加锁的粒度等
-
ReenstrantLock 危险性更高(如忘记在 finally 块中 lock.unlock() 了,会导致锁永远无法被释放,出现问题,极难 debug)
-
许多现有程序中已使用了 synchronized,两种方式混合使用比较易错
锁差不多就这些内容了。下面总结了所有锁的分类
final的线程安全
对final域来说,编译器和处理器需要遵守以下规则:
1.在构造函数内对一个final域的写入和随后把这个被构造的对象的引用赋值给一个引用变量,这两个操作之间不能重排序;
什么意思,就是不能先把对象的引用赋值给引用变量之后才在构造函数内对final进行写入。
2.初次读一个final对象的引用,与随后初次读这个final域,两个操作之间不能重排序。
需要先读对象的引用,在读final域。
为什么final引用不能从构造函数内“逸出”
在引用变量为任意线程可见之前,该引用变量指向的对象final域已经在构造函数中被正确的初始化了。
但这样还不够,必须保证初始化的时候不能对任意线程可见,即在构造函数内部,不能让这个被构造的引用为其他线程所见。
如上图所示。
同步容器类
这些容器实现同步,其实就是对每一个共有方法加synchronized关键字来实现。相当于让所有对容器的访问串行操作。并发性差。
并发容器类
不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
提供了许多原子的复合操作
1.V putIfAbsent(K key, V value):
在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16
Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁
锁分段实现的原理:
不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:
-
使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;
-
这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;
-
对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;
-
-
是否需要扩容
-
在插入元素前判断是否需要扩容,
-
比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
-
-
如何扩容
-
先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
-
为了高效,ConcurrentHashMap 只对某个 segment 进行扩容
-
-
-
关于 size 操作:
-
存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。
-
解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即
Segment.modCount
CopyOnWriteArrayList
-
只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;
-
应用于迭代操作远多于修改操作的情形,如:事件通知系统,分发通知时需要迭代已注册监听器链表,并调用每一个监听器,一般注册和注销事件监听器的操作远少于接收事件通知的操作。
并发工具类
可以根据自身状态协调线程的控制流:
-
生产者消费者模式:阻塞队列(BlockingQueue)
-
并发流程控制:
-
闭锁(CountDownLatch)
-
栅栏(Barrier)
-
-
-
线程间的数据交换:交换者(Exchanger)
以上就是实现了的多种阻塞队列
阻塞队列实现的原理:(关注点是BQ是如何在队列满的时候通知put以及如何在队列空的时候通知take的)
-
-
这个 lock 给我们提供了两个 Condition:notEmpty 和 notFull;
-
put操作中,会以 while 循环的方式轮询 count == items.length,如果为 true,就 notFull.await(),这个阻塞状态需要通过 dequeue 方法中的 notFull.signal() 来解除;
-
take操作中,会以 while 循环的方式轮询 count == 0,如果为 true,就 notEmpty.await(),这个阻塞状态需要通过 enqueue 方法中的 notEmpty.signal()
public CountDownLatch(int count); // 参数count为计数值 // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,或等待中线程中断 public void await() throws InterruptedException; // 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException; public void countDown(); // 将count值减1
CyclicBarrier 栅栏
可以让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,让所有线程通过,并且这个屏障可以循环使用(这点和 CountDownLatch 很不同)。
/** * parties指让多少个线程或者任务等待至barrier状态 * barrierAction为当这些线程都达到barrier状态时会执行的内容 */ public CyclicBarrier(int parties, Runnable barrierAction); // 常用 public CyclicBarrier(int parties); public int await() throws InterruptedException, BrokenBarrierException; public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
// 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 public Semaphore(int permits, boolean fair) { sync = (fair) ? new FairSync(permits) : new NonfairSync(permits); } /* 会阻塞等待的acquire方法 */ public void acquire() throws InterruptedException; // 获取一个许可 public void acquire(int permits) throws InterruptedException; // 获取permits个许可 public void release(); // 释放一个许可 public void release(int permits); // 释放permits个许可 /* 会阻塞但不等待,立即返回的acquire方法 */ // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire() { } // 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { } // 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits) { } // 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }
public Exchanger(); public V exchange(V x) throws InterruptedException; public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
线程池
为什么使用线程池
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。
当负载过载时,应用程序的性能应该逐渐降低,而不是直接失败。
如果不使用线程池,为每一个任务都创建一个线程来执行,其一,线程的创建和销毁都需要时间,其二,线程数超过cpu数,增加线程反而会降低性能,因为会出现频繁的上下文切换。
因此,使用线程池的好处就是:
1.降低资源消耗:可以重复使用已经创建好的线程。
2.提高响应速度:任务到达时,不需要等待线程创建的时间。
3.提高线程的可管理性。
线程的Executor框架
Executor 框架的主要成员:
-
ThreadPoolExecutor
-
ScheduledThreadPoolExecutor
-
Future 接口 & FutureTask 实现类
-
Executors 工厂类
Executor是一个接口,只定义了一个方法
void execute(Runnable command);
这是几乎所有需要执行的类需要的方法,历练是一个要执行的接口类,执行的run方法需要自己定义。
ExecutorService是继承了Exector的接口,里面定义了一些需要实现的方法。
// shutdown方法将执行平缓的关闭过程: // 不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还未开始执行的任务) void shutdown(); // 执行粗暴的关闭过程: // 它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务 List<Runnable> shutdownNow(); boolean isshutdown(); // 返回ExecutorService是否已经终止 boolean isTerminated(); // 等待ExecutorService到达终止状态,一般调用完它之后立即调用shutdown boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; // ...
对于线程池的实现类:ThreadPoolExecutor,它继承了抽象类AbstractExecutorService,而该抽象类则实现了ExecutorService接口。
这个类的一些方法就不多做描述了,具体线程池的处理步骤如下:
下面是Executor框架的执行框图:
Executor框架由三大部分组成:
1.任务,被执行的任务需要实现的接口:Runnable接口。Callable接口。
2.任务的执行:包括执行的核心接口Executor,以及继承这些接口的实现类。
3.异步计算的结果:包括Future接口和接口的实现类。
Future接口以及实现类:
Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
public incerface Future<V>{ boolean cance1(boolean mayInterruptIfRunning); boolean iscancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException, CancellationException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException, CancellationException,TimeoutException; }
Future需要自己实现,提供了一个FutureTask的实现类
状态迁移
执行过程
Future 的 get 方法对于任务的状态的不同表现:
-
任务已完成:立即返回结果或抛出异常。
-
任务未完成:阻塞直到任务完成。
-
任务抛出异常:将异常封装为 ExecutionException 后再次抛出,ExecutionException 异常可以通过 getCause() 方法获取被封装的初始异常。
-
任务被取消:抛出 CancallationException 异常,这是个 RuntimeException,需要显式 catch。
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
分类
FixedThreadPool
特点: 固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, // 线程池大小不可扩展 0L, TimeUnit.MILLISECONDS, // 多余线程会被立即终止 new LinkedBlockingQueue<Runnable>()); // 使用容量为 Integer.MAX_VALUE 的工作队列 // 由于使用了无界队列,不会拒绝任务,所以不会调用 handler }
CacheThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 初始为0,线程池中的线程数是无界的 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
-
池中不会有空闲线程,也不会有等待的线程
-
一旦任务到达的速度大于线程池处理任务的速度,就会创建一个新的线程给任务
-
与另外两个线程池不同的地方在于,这个工作队列并不是用来放还没有执行的任务的,而是用来放执行过任务后空闲下的线程的,空闲下来的线程会被:
SynchronousQueue#poll(keepAliveTime, TimeUnit.NANOSECONDS)
SingleThreadPool
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, // 线程池的大小固定为1 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); // 使用容量为 Integer.MAX_VALUE 的工作队列 }
线程池特点:
-
-
将线程池的 corePoolSize 设置为 0 且不使用 SynchronousQueue 作为工作队列会产生的奇怪行为:只有当线程池的工作队列被填满后,才会开始执行任务
-
产生原因:如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,
-
ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
实现原理:
-
使用 DelayWorkQueue 作为工作队列,ScheduledThreadPoolExecutor 会把待执行的任务 ScheduledFutureTask 放到工作队列中
-
ScheduledFutureTask 中有以下 3 个主要的成员变量:
-
long time:表示该任务将要被执行的具体时间;
-
long sequenceNumber:表示任务被添加到 ScheduledThreadPoolExecutor 中的序号;
-
long period:表示任务执行的间隔周期。
-
-
任务执行的过程:
-
线程从 DelayWorkQueue 中获取到期的任务;
-
执行这个任务;
-
修改这个任务的 time 为下一次的执行时间;
-
将该任务再次 add 进 DelayWorkQueue。
-
对比 Timer(Timer 的缺陷)
-
Timer 在执行所有定时任务时只会创建一个线程。如果有一个任务执行时间太长导致它后面的任务超时,那么后面超时的任务会立即执行,从而破坏了其他 TimerTask 的准时执行。线程池能弥补这个缺陷,因为它可以提供多个线程来执行延时任务和周期任务。
-
线程泄漏:Timer 线程并不捕获未检查异常,当 TimerTask 抛出未检查的异常时将终止定时线程。这种情况下,整个 Timer都会被取消,将导致已经被调度但尚未执行的 TimerTask 将不会再执行,新的任务也不能被调度。
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); // 抛异常! } }
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //这里什么都不干 } }
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 先判断线程池关没 e.getQueue().poll(); // 丢到等待队列中下一个要被执行的任务 e.execute(r); // 重新尝试提交新来的任务 } } }
-
-
如当主线程提交了任务时,任务队列已满,此时该任务会在主线程中执行。这样主线程在一段时间内不会提交任务给线程池,使得工作者线程有时间来处理完正在执行的任务
-
可以实现服务器在高负载下的性能缓慢降低
-
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 直接在把它提交来的线程调用它的 run 方法,相当于没有新建一个线程来执行它, // 而是直接在提交它的线程执行它,这样负责提交任务的线程一段时间内不会提交新的任务来 r.run(); } } }
线程池大小的设置
-
计算密集型任务: N = N_cpu + 1
-
加 1 的原因:当有一个线程偶尔故障时,额外的那个线程可以立即补上,保证CPU时钟不会被浪费
-
-
包含 I/O 或其他阻塞操作: N = N_cpu * U_cpu * (1 + W / C)
-
N_cpu:CPU 的个数
-
U_cpu:目标 CPU 利用率
-
W / C:等待时间 (Wait) / 计算时间 (Compute)
-
获取 CPU 数目的方法:
int N_CPUS = Runtime.getRuntime().availableProcessors();
-
生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
如果缓冲区已经满了,则生产者线程阻塞;
如果缓冲区为空,那么消费者线程阻塞。
方式一:
package com.liuxinghang.bingfa; public class ProducerAndConsumer { public static void main(String[] args) { Resource resource=new Resource(); ProducerThread p1=new ProducerThread(resource); ConsumerThread c1=new ConsumerThread(resource); ConsumerThread c2=new ConsumerThread(resource); p1.start(); c1.start(); c2.start(); } } //资源类 class Resource{ //当前的资源数 private int num=0; //可以存储的资源数 private int size=10; //添加资源 public synchronized void add(){ if(num<size)//没有满的时候 { num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //因为已经生产了,现在可以通知消费者来消费了 notifyAll(); }else {//资源已经满了 try { wait();//生产者线程阻塞,因为已经满了 System.out.println(Thread.currentThread().getName()+"线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void remove(){ if(num>0)//资源不为空,可以消费 { num--; System.out.println(Thread.currentThread().getName() + "消费一件资源,当前资源池有" + num + "个"); //已经消费了一件了,资源不会为满,因此可以通知生产者来生产了。 notifyAll(); }else {//资源已经空了 try { wait();//消费者线程阻塞,因为已经空了 System.out.println(Thread.currentThread().getName()+"线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } } } //生产者的线程,只做一件事,就是添加资源 class ProducerThread extends Thread{ private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } @Override public void run() { resource.add(); } } //消费者的线程,只做一件事,就是添加资源 class ConsumerThread extends Thread{ private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } @Override public void run() { resource.remove(); } }
方式二
package com.liuxinghang.a; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerAndConsumer1 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Resource1 resource = new Resource1(lock,producerCondition,consumerCondition); ProducerThread p1=new ProducerThread(resource); ConsumerThread c1=new ConsumerThread(resource); ConsumerThread c2=new ConsumerThread(resource); p1.start(); c1.start(); c2.start(); } } class Resource1{ private int num=0;//当前资源数 private int size=10;//资源总数 //锁 private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Resource1(Lock lock, Condition producerCondition, Condition consumerCondition) { this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } public void add(){ //添加之前加锁 lock.lock(); try { if (num < size) { num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //生产了商品可以唤醒消费者了 consumerCondition.signalAll(); } else { //让生产者线程阻塞 try { producerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { lock.unlock(); } } public void remove(){ //添加之前加锁 lock.lock(); try { if (num >0) { num--; System.out.println(Thread.currentThread().getName() + "消费一件资源,当前资源池有" + num + "个"); //消费了商品可以唤醒生产者了 producerCondition.signalAll(); } else { //让消费者者线程阻塞 try { consumerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { lock.unlock(); } } } //生产者的线程,只做一件事,就是添加资源 class ProducerThread extends Thread{ private Resource1 resource; public ProducerThread(Resource1 resource){ this.resource = resource; } @Override public void run() { resource.add(); } } //消费者的线程,只做一件事,就是添加资源 class ConsumerThread extends Thread{ private Resource1 resource; public ConsumerThread(Resource1 resource){ this.resource = resource; } @Override public void run() { resource.remove(); } }
方式三:
package com.liuxinghang.a; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class BlockingQueueConsumerProducer { public static void main(String[] args) { Resource resource=new Resource(); ProducerThread1 p = new ProducerThread1(resource); ConsumerThread1 c=new ConsumerThread1(resource); p.start(); c.start(); } } class Resource{ BlockingQueue resourceQueue=new LinkedBlockingQueue(10);//队列里面可以加入10个资源 public void add(){ try { resourceQueue.put(1); System.out.println("生产者" + Thread.currentThread().getName() + "生产一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } public void remove(){ try { resourceQueue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } } class ProducerThread1 extends Thread{ private Resource resource; public ProducerThread1(Resource resource) { this.resource = resource; } @Override public void run() { //添加 while(true){ resource.add(); } } } class ConsumerThread1 extends Thread{ private Resource resource; public ConsumerThread1(Resource resource) { this.resource = resource; } @Override public void run() { //添加 // while(true){ resource.remove(); // } } }
线程池设计
线城池就是以一个或多个线程来执行多个应用程序的线程集合
方案一:
编写一个线程池接口
public interface ThreadPool <Job extends Runnable> { //执行一个任务(Job),这个Job必须实现Runnable void execute(Job job); //关闭线程池 void shutdown(); //增加工作者线程,即用来执行任务的线程 void addWorkers(int num); //减少工作者线程 void removeWorker(int num); //获取正在等待执行的任务数量 int getJobSize(); }
概念:Job是一个任务,它继承了Runnable接口,可以用户自己定义自己要执行的任务,把任务放在run方法里面。
工作者线程:也是一个任务,只是每一个工作者都在做一件事,就是从job队列中取出job来,然后将该job运行。
当然这里面就涉及到可能job队列是空的,没办法取到,因此,需要等待唤醒机制。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //线程池维护工作线程的最大数量 private static final int MAX_WORKER_NUMBERS=10; //线程池维护工作线程的默认值 private static final int DEFAULT_WORKER_NUMBERS=5; //线程池维护工作线程的最小数量 private static final int MIN_WORKER_NUMBERS=1; //维护一个工作列表,加入客户端发起的工作任务 private final LinkedList<Job> jobs=new LinkedList<Job>(); //工作者线程的列表 private final List<Worker> workers= Collections.synchronizedList(new ArrayList<Worker>()); //工作者线程的数量 private int workerNum; //每个工作者线程编号的生成 private AtomicLong threadNum=new AtomicLong(); //生成默认的线程池 public DefaultThreadPool(){ this.workerNum=DEFAULT_WORKER_NUMBERS; initializeWorkers(this.workerNum); } public DefaultThreadPool(int num){ if(num>MAX_WORKER_NUMBERS){ this.workerNum=DEFAULT_WORKER_NUMBERS; }else { this.workerNum=num; } initializeWorkers(this.workerNum); } //初始化每一个工作者的线程 private void initializeWorkers(int num){ for(int i=0;i<num;i++){ Worker worker=new Worker(); workers.add(worker); Thread thread=new Thread(worker); thread.start(); } } //执行job @Override public void execute(Job job) { if(job!=null){ synchronized (jobs){//这里使用生产者消费者模式 jobs.addLast(job); jobs.notifyAll(); } } } @Override public void shutdown() { for(Worker worker:workers){ worker.shutdown(); } } @Override public void addWorkers(int num) { //加锁,防止该线程还没增加完成而下个线程继续增加导致工作者线程超过最大值 synchronized (jobs){ if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWorkers(num); this.workerNum+=num; } } @Override public void removeWorker(int num) { synchronized (jobs) { if(num>=this.workerNum){ throw new IllegalArgumentException("超过了已有的线程数量"); } for (int i = 0; i < num; i++) { Worker worker = workers.get(i); if (worker != null) { //关闭该线程并从列表中移除 worker.shutdown(); workers.remove(i); } } this.workerNum -= num; } } @Override public int getJobSize() { return workers.size(); } //定义一个工作者的线程类 class Worker implements Runnable{ //表示是否运行该worker private volatile boolean running=true; @Override public void run() { while(running){ Job job=null; //线程的等待通知机制 synchronized (jobs){ if(jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部对该线程的中断操作,返回 Thread.currentThread().interrupt(); return; } } //如果不为空,从job里面取出一个job(job是任务) job=jobs.removeFirst(); } //执行job if(job!=null){ job.run(); } } } public void shutdown(){ running=false; } } }
方案二:通过ThreadGroup来创建
线程池一般需要一个线程管理类: ThreadPoolManager,其作用有:
1)提供创建一定数量的线程的方法。主线程调用该方法,从而创建线程。创建的线程执行自己的例程,线程的例程阻塞在任务抓取上。
2)提供对任务队列的操作的方法。主线程调用初始化任务队列的方法,然后在有任务的时候,调用提供的任务添加方法,将任务添入等待队列。当主线程调用任务的添加方法时,会触发等待的线程,从而使得阻塞的线程被唤醒,其抓取任务,并执行任务。
线程池需要一个任务队列: List<Task>,其作用有:
提供任务的增删方法。而且该任务队列需要进行排他处理,防止多个工作线程对该任务队列进行同时的抓取操作或者主线程的加入与工作线程的抓取的并发操作。
线程池需要一个类似信号量的通知机制:wait -notify:
工作线程调用wait阻塞在任务抓取上。主线程添加任务后,调用notify触发阻塞的线程。
线程池需要一个线程类:WorkThread,其作用有:
提供线程的例程。创建线程WorkThread后,需要抓取任务,并执行任务。这是线程的例程。
线程池需要一个任务类:Task,其作用有:
提供线程抓取并执行的任务目标。
以上是关于java并发总结的主要内容,如果未能解决你的问题,请参考以下文章
经验总结:Java高级工程师面试题-字节跳动,成功跳槽阿里!
全栈编程系列SpringBoot整合Shiro(含KickoutSessionControlFilter并发在线人数控制以及不生效问题配置启动异常No SecurityManager...)(代码片段