java基础Java并发包
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java基础Java并发包相关的知识,希望对你有一定的参考价值。
Java并发包
一、并发包的结构
java并发包中共分为五类:
1、集合框架:包括队列和并发集合
2、同步辅助类
3、线程池
4、Lock锁
5、原子类
二、详解部分
1、同步辅助类详解部分
(1)CountDownLatch
举例:有三个工作,只有三个工作都完成,任务才算执行完成。
1 import java.util.concurrent.CountDownLatch; 2 3 /** 4 * 5 * @author qiuxiang 6 * 7 */ 8 public class CountDownLatchDemo { 9 public static void main(String[] args) throws InterruptedException { 10 CountDownLatch latch = new CountDownLatch(3); 11 Worker w1 = new Worker("quinns",latch); 12 Worker w2 = new Worker("tengyunhao",latch); 13 Worker w3 = new Worker("jude",latch); 14 w1.start(); 15 w2.start(); 16 w3.start(); 17 latch.await(); 18 System.out.println("Main thread done"); 19 } 20 } 21 22 class Worker extends Thread{ 23 private String workerName; 24 private CountDownLatch latch; 25 public Worker(String workerName,CountDownLatch latch) { 26 this.workerName = workerName; 27 this.latch = latch; 28 } 29 @Override 30 public void run() { 31 try { 32 System.out.println("Worker:"+workerName+" is begin"); 33 Thread.sleep(10000L); 34 System.out.println("Worker:"+workerName+" is end"); 35 } catch (Exception e) { 36 // TODO: handle exception 37 } 38 latch.countDown(); 39 } 40 }
运行结果:
它相当于一个计数器。用给定的数值初始化CountDownLatch,之后计数器从这个值开始倒计数,直到计数值达到零。
CountDownLatch是通过“共享锁”实现的。在创建时传递的参数代表“锁计数器”的初始状态,表示该“共享锁”最多能被多少个线程同时获取,这个值只能设置一次。
主线程必须在启动其他线程后立即调用await()方法。这样主线程的操作就会被这个方法阻塞,直到其他的线程完成各自的任务。
(2)CyclicBarrier
1 import java.util.concurrent.CyclicBarrier; 2 3 /** 4 * 5 * @author qiuxiang 6 * 7 */ 8 public class CyclicBarrierDemo { 9 public static void main(String[] args) { 10 CyclicBarrier barrier = new CyclicBarrier(3,new TotalTask()); 11 BillTask worker1 = new BillTask("1", barrier); 12 BillTask worker2 = new BillTask("2", barrier); 13 BillTask worker3 = new BillTask("3", barrier); 14 worker1.start(); 15 worker2.start(); 16 worker3.start(); 17 System.out.println("Main Thread End"); 18 19 } 20 static class TotalTask extends Thread{ 21 @Override 22 public void run() { 23 System.out.println("所有子任务都执行完了,就开始执行主任务"); 24 } 25 } 26 static class BillTask extends Thread{ 27 String billName; 28 CyclicBarrier barrier; 29 public BillTask(String workerName,CyclicBarrier barrier){ 30 this.billName = workerName; 31 this.barrier = barrier; 32 } 33 @Override 34 public void run() { 35 try { 36 System.out.println("市区:"+billName+"运算开始"); 37 Thread.sleep(1000L); 38 System.out.println("市区:"+billName+"运算完成,等待中。。。。"); 39 barrier.await();//等待所有线程都调用过此函数,才能进行后续动作。 40 System.out.println("全部都结束,市区"+billName+"才开始后面的工作。"); 41 } catch (Exception e) { 42 // TODO: handle exception 43 } 44 } 45 } 46 }
运行结果:
这个类是一个重复利用的屏障类。它允许一组线程相互等待,直到全部到达某个公共屏障点,然后所有的这组线程再同步往后执行。
await()函数每被调用一次,计数便会减1,并阻塞住当前线程。当计数减至0时,阻塞解除。
(3)Semaphore
可以控制某个资源可被同时访问的个数,通过构造函数设定一定数量的许可,通过acquire()获得一个许可,如果没有就等待,而release()释放一个许可。
举例:只允许5个线程用时进入执行acquire()和release()之间的代码
1 import java.util.Random; 2 import java.util.concurrent.ExecutorService; 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.Semaphore; 5 6 public class SemaphoreDemo { 7 public static void main(String[] args) { 8 //线程池 9 ExecutorService exec = Executors.newCachedThreadPool(); 10 //只能5个线程同时访问 11 final Semaphore semp = new Semaphore(5); 12 //模拟20个客户端访问 13 for (int i = 0; i < 20; i++) { 14 15 final int No = i; 16 Runnable run = new Runnable() { 17 public void run() { 18 try { 19 //获取许可 20 Random rad = new Random(); 21 semp.acquire(); 22 System.out.println("Accessing:"+No); 23 Thread.sleep(rad.nextInt(10000)); 24 //访问完后,释放,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞。 25 // semp.release(); 26 } catch (Exception e) { 27 // TODO: handle exception 28 } 29 } 30 }; 31 //退出线程池 32 exec.execute(run); 33 } 34 } 35 36 }
2、并发框架详解部分
2.1 阻塞队列详解部分
阻塞队列是一个支持两个附加操作的队列。这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,消费者只从容器里拿元素。
(1)ArrayBlockingQueue
ArrayBlockingQueue是一个由数组支持的有界缓存的阻塞队列。在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现是相似的,适合于实现“生产消费者”模式。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。这个类是线程安全的。生产者和消费者公用一把锁。
(2)LinkedBlockingQueue
基于链表的阻塞队列,内部维持着一个数据缓存队列(该队列由链表构成)。
只有当队列缓冲区达到最大值缓冲容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者线程,直到消费者从队列中消耗掉一份数据,生产者线程会被唤醒,反之对于消费者这端的的处理也基于同样原理。
LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
ArrayBlockingQueue和LinkedBlockingQueue的区别:
1、队列大小的初始化方式不同
ArrayBlockingQueue是有界的,必须指定队列的大小;
LinkedBlockingQueue需要分情况,指定大小时,就是有界的;不指定队列的大小时,默认是Integer.MAX_VALUE,看成无界队列,但当生产速度大于消费速度时候,有可能会内存溢出。
2、队列中锁的实现
ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产者和消费者公用同一个锁;进行put和take操作,共用同一个锁对象。既put和take无法并行执行。
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock。即读写各持一个锁,避免了读(take)写(put)时互相进行竞争锁的情况,可并行执行。
3、在生产和消费时操作不同
ArrayBlockingQueue基于数组,在插入或删除元素时,是直接将枚举对象插入或移除的,不会产生或销毁任何额外的对象实例;
LinkedBlockingQueue基于链表,在插于或删除元素时,需要把枚举对象转换为Node<E>进行插入或移除,会生成一个额外的Node对象,这在长时间内需要高效并发的处理大批量数据的系统中,其对于GC的影响还是存在一定的区别,会影响性能。
put()和take()
都可以实现阻塞的功能。
put()方法:把元素加入到阻塞队列中,如果阻塞队列没有空间,则调用此方法的线程被阻塞,直到有空间的时候再继续。
take()方法:取出排在阻塞队列首位的对象,若阻塞队列为空,则调用此方法的线程被阻塞,直到有新的对象加入的时候再继续。
offer()和poll()
不具有阻塞的功能。
offer()方法:把元素加入到阻塞队列中,如果可以容纳,则返回true。如果不可以容纳,则返回false。
poll()方法:取出排在阻塞队列首位的对象,若阻塞队列为空,则返回null,如果不为空,则返回取出来的那个元素。
(3)PriorityBlockingQueue和PriorityQueue
此阻塞队列为基于数组的无界阻塞队列。它会按照元素的优先级对元素进行排序,按照优先级顺序出对,每次出对的元素都是优先级最高的元素。注意,不会阻塞生产者,但会阻塞消费者。PriorityBlockingQueue里面存储的对象必须是实现Comparable接口,队列通过这个接口的compare方法确定对象的priority。
队列的元素并不是全部按优先级排序的,但是队首的优先级肯定是最高的。每取一个头元素时候,都会对剩余的元素做一次调整,这样就能保证每次队首的元素都是优先级最高的元素。
(4)DelayQueue
DelayQueue是一个无界阻塞队列,用于放置实现了Delayed接口的对象,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。这个队列里面所存储的对象都有一个时间参数,采用take获取数据时,如果时间没到,取不出来任何数据。而加入数据的时候,是不会阻塞的(不阻塞生产者,阻塞消费者)。DelayQueue内部使用PriorityQueue实现的,是一个使用PriorityQueue实现的BlockingQueue,优先队列的比较基准值是时间。本质即:DelayQueue = BlockingQueue + PriorityQueue + Delayed。
如果不使用DelayQueue,解决办法就是:使用一个后台线程,遍历所有对象,但是在对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精度,过小会影响效率。
应用场景:缓存系统的统计。缓存的对象,超过了有效时间,需要从缓存中移出。使用一个线程循环查询DelayQueue,一旦在DelayQueue中获取元素,表示缓存有效期到了。
(5)SynchronousQueue
同步队列是一个不存储元素的队列,它的size()方法总是返回0。每个线程的插入操作必须等待另一个线程的移除操作,同样任何一个线程的移除操作都必须等待另一个线程的插入操作。可以认为SynchronousQueue是一个缓存值为1的阻塞队列。
2.2 框架详解部分
(1)因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
(2)HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。
ConcurrentHashMap的锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
ConcurrentHashMap的结构
我们通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组。
初始化segments数组。让我们来看一下初始化segmentShift,segmentMask和segments数组的源代码。
1 if (concurrencyLevel > MAX_SEGMENTS) 2 3 concurrencyLevel = MAX_SEGMENTS; 4 5 // Find power-of-two sizes best matching arguments 6 7 int sshift = 0; 8 9 int ssize = 1; 10 11 while (ssize < concurrencyLevel) { 12 13 ++sshift; 14 15 ssize <<= 1; 16 17 } 18 19 segmentShift = 32 - sshift; 20 21 segmentMask = ssize - 1; 22 23 this.segments = Segment.newArray(ssize);
由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。注意concurrencyLevel的最大大小是65535,意味着segments数组的长度最大为65536,对应的二进制是16位。
初始化segmentShift和segmentMask。这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。
初始化每个Segment。输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。
1 if (initialCapacity > MAXIMUM_CAPACITY) 2 3 initialCapacity = MAXIMUM_CAPACITY; 4 5 int c = initialCapacity / ssize; 6 7 if (c * ssize < initialCapacity) 8 9 ++c; 10 11 int cap = 1; 12 13 while (cap < c) 14 15 cap <<= 1; 16 17 for (int i = 0; i < this.segments.length; ++i) 18 19 this.segments[i] = new Segment<K,V>(cap, loadFactor);
上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。
定位Segment
既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。
1 private static int hash(int h) { 2 3 h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); 4 5 h += (h << 3); h ^= (h >>> 6); 6 7 h += (h << 2) + (h << 14); return h ^ (h >>> 16); 8 9 }
再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。我做了一个测试,不通过再哈希而直接执行哈希计算。
1 System.out.println(Integer.parseInt("0001111", 2) & 15); 2 3 System.out.println(Integer.parseInt("0011111", 2) & 15); 4 5 System.out.println(Integer.parseInt("0111111", 2) & 15); 6 7 System.out.println(Integer.parseInt("1111111", 2) & 15);
计算后输出的哈希值全是15,通过这个例子可以发现如果不进行再哈希,哈希冲突会非常严重,因为只要低位一样,无论高位是什么数,其哈希值总是一样。我们再把上面的二进制数据进行再哈希后结果如下,为了方便阅读,不足32位的高位补了0,每隔四位用竖线分割下。
1 0100|0111|0110|0111|1101|1010|0100|1110 2 3 1111|0111|0100|0011|0000|0001|1011|1000 4 5 0111|0111|0110|1001|0100|0110|0011|1110 6 7 1000|0011|0000|0000|1100|1000|0001|1010
可以发现每一位的数据都散列开了,通过这种再哈希能让数字的每一位都能参加到哈希运算当中,从而减少哈希冲突。ConcurrentHashMap通过以下哈希算法定位segment。
默认情况下segmentShift为28,segmentMask为15,再哈希后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到hash运算中, (hash >>> segmentShift) & segmentMask的运算结果分别是4,15,7和8,可以看到hash值没有发生冲突。
1 final Segment<K,V> segmentFor(int hash) { 2 3 return segments[(hash >>> segmentShift) & segmentMask]; 4 5 }
ConcurrentHashMap的get操作
Segment的get操作实现非常简单和高效。先经过一次再哈希,然后使用这个哈希值通过哈希运算定位到segment,再通过哈希算法定位到元素,代码如下:
1 public V get(Object key) { 2 3 int hash = hash(key.hashCode()); 4 5 return segmentFor(hash).get(key, hash); 6 7 }
get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空的才会加锁重读,我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是根据java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。
1 transient volatile int count; 2 3 volatile V value;
在定位元素的代码里我们可以发现定位HashEntry和定位Segment的哈希算法虽然一样,都与数组的长度减去一相与,但是相与的值不一样,定位Segment使用的是元素的hashcode通过再哈希后得到的值的高位,而定位HashEntry直接使用的是再哈希后的值。其目的是避免两次哈希后的值一样,导致元素虽然在Segment里散列开了,但是却没有在HashEntry里散列开。
1 hash >>> segmentShift) & segmentMask//定位Segment所使用的hash算法 2 3 int index = hash & (tab.length - 1);// 定位HashEntry所使用的hash算法
ConcurrentHashMap的Put操作
由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须得加锁。Put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置然后放在HashEntry数组里。
是否需要扩容。在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阀值,数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
ConcurrentHashMap的size操作
如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。
因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。
3、Locks详解部分
(1)Lock
首先要说明的就是Lock,通过查看Lock的源码可知,Lock是一个接口:
1 public interface Lock { 2 void lock(); 3 void lockInterruptibly() throws InterruptedException; 4 boolean tryLock(); 5 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; 6 void unlock(); 7 Condition newCondition(); 8 }
下面来逐个讲述Lock接口中每个方法的使用,lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用来获取锁的。unLock()方法是用来释放锁的。
在Lock中声明了四个方法来获取锁,那么这四个方法有何区别呢?
首先lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
如果采用Lock,必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。通常使用Lock来进行同步的话,是以下面这种形式去使用的:
1 Lock lock = ...; 2 lock.lock(); 3 try{ 4 //处理任务 5 }catch(Exception ex){ 6 7 }finally{ 8 lock.unlock(); //释放锁 9 }
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
所以,一般情况下通过tryLock来获取锁时是这样使用的:
1 Lock lock = ...; 2 if(lock.tryLock()) { 3 try{ 4 //处理任务 5 }catch(Exception ex){ 6 7 }finally{ 8 lock.unlock(); //释放锁 9 } 10 }else { 11 //如果不能获取锁,则直接做其他事情 12 }
lockInterruptibly()方法比较特殊,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就是说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。
由于lockInterruptibly()的声明中抛出了异常,所以lock.lockInterruptibly()必须放在try块中或者在调用lockInterruptibly()的方法外声明抛出InterruptedException。
因此lockInterruptibly()一般的使用形式如下:
1 public void method() throws InterruptedException { 2 lock.lockInterruptibly(); 3 try { 4 //..... 5 } 6 finally { 7 lock.unlock(); 8 } 9 }
注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。因为本身在前面的文章中讲过单独调用interrupt()方法不能中断正在运行过程中的线程,只能中断阻塞过程中的线程。
因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。
而用synchronized修饰的话,当一个线程处于等待某个锁的状态,是无法被中断的,只有一直等待下去。
(2)ReentrantLock
ReentrantLock,意思是“可重入锁”。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法。下面通过一些实例看具体看一下如何使用ReentrantLock。
例子1,lock()的正确使用方法
1 public class Test { 2 private ArrayList<Integer> arrayList = new ArrayList<Integer>(); 3 public static void main(String[] args) { 4 final Test test = new Test(); 5 6 new Thread(){ 7 public void run() { 8 test.insert(Thread.currentThread()); 9 }; 10 }.start(); 11 12 new Thread(){ 13 public void run() { 14 test.insert(Thread.currentThread()); 15 }; 16 }.start(); 17 } 18 19 public void insert(Thread thread) { 20 Lock lock = new ReentrantLock(); //注意这个地方 21 lock.lock(); 22 try { 23 System.out.println(thread.getName()+"得到了锁"); 24 for(int i=0;i<5;i++) { 25 arrayList.add(i); 26 } 27 } catch (Exception e) { 28 // TODO: handle exception 29 }finally { 30 System.out.println(thread.getName()+"释放了锁"); 31 lock.unlock(); 32 } 33 } 34 }
输出结果:
Thread-0得到了锁 Thread-1得到了锁 Thread-0释放了锁 Thread-1释放了锁
也许有朋友会问,怎么会输出这个结果?第二个线程怎么会在第一个线程释放锁之前得到了锁?原因在于,在insert方法中的lock变量是局部变量,每个线程执行该方法时都会保存一个副本,那么理所当然每个线程执行到lock.lock()处获取的是不同的锁,所以就不会发生冲突。
知道了原因改起来就比较容易了,只需要将lock声明为类的属性即可。
1 public class Test { 2 private ArrayList<Integer> arrayList = new ArrayList<Integer>(); 3 private Lock lock = new ReentrantLock(); //注意这个地方 4 public static void main(String[] args) { 5 final Test test = new Test(); 6 7 new Thread(){ 8 public void run() { 9 test.insert(Thread.currentThread()); 10 }; 11 }.start(); 12 13 new Thread(){ 14 public void run() { 15 test.insert(Thread.currentThread()); 16 }; 17 }.start(); 18 } 19 20 public void insert(Thread thread) { 21 lock.lock(); 22 try { 23 System.out.println(thread.getName()+"得到了锁"); 24 for(int i=0;i<5;i++) { 25 arrayList.add(i); 26 } 27 } catch (Exception e) { 28 // TODO: handle exception 29 }finally { 30 System.out.println(thread.getName()+"释放了锁"); 31 lock.unlock(); 32 } 33 } 34 }
这样就是正确地使用Lock的方法了。
例子2,tryLock()的使用方法
1 public class Test { 2 private ArrayList<Integer> arrayList = new ArrayList<Integer>(); 3 private Lock lock = new ReentrantLock(); //注意这个地方 4 public static void main(String[] args) { 5 final Test test = new Test(); 6 7 new Thread(){ 8 public void run() { 9 test.insert(Thread.currentThread()); 10 }; 11 }.start(); 12 13 new Thread(){ 14 public void run() { 15 test.insert(Thread.currentThread()); 16 }; 17 }.start(); 18 } 19 20 public void insert(Thread thread) { 21 if(lock.tryLock()) { 22 try { 23 System.out.println(thread.getName()+"得到了锁"); 24 for(int i=0;i<5;i++) { 25 arrayList.add(i); 26 } 27 } catch (Exception e) { 28 // TODO: handle exception 29 }finally { 30 System.out.println(thread.getName()+"释放了锁"); 31 lock.unlock(); 32 } 33 } else { 34 System.out.println(thread.getName()+"获取锁失败"); 35 } 36 } 37 }
输出结果:
Thread-0得到了锁 Thread-1获取锁失败 Thread-0释放了锁
例子3,lockInterruptibly()响应中断的使用方法:
1 public class Test { 2 private Lock lock = new ReentrantLock(); 3 public static void main(String[] args) { 4 Test test = new Test(); 5 MyThread thread1 = new MyThread(test); 6 MyThread thread2 = new MyThread(test); 7 thread1.start(); 8 thread2.start(); 9 10 try { 11 Thread.sleep(2000); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 thread2.interrupt(); 16 } 17 18 public void insert(Thread thread) throws InterruptedException{ 19 lock.lockInterruptibly(); //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出 20 try { 21 System.out.println(thread.getName()+"得到了锁"); 22 long startTime = System.currentTimeMillis(); 23 for( ; ;) { 24 if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE) 25 break; 26 //插入数据 27 } 28 } 29 finally { 30 System.out.println(Thread.currentThread().getName()+"执行finally"); 31 lock.unlock(); 32 System.out.println(thread.getName()+"释放了锁"); 33 } 34 } 35 } 36 37 class MyThread extends Thread { 38 private Test test = null; 39 public MyThread(Test test) { 40 this.test = test; 41 } 42 @Override 43 public void run() { 44 45 try { 46 test.insert(Thread.currentThread()); 47 } catch (InterruptedException e) { 48 System.out.println(Thread.currentThread().getName()+"被中断"); 49 } 50 } 51 }
运行之后,发现thread2能够被正确中断。
(3)ReadWriteLock
ReadWriteLock也是一个接口,在它里面只定义了两个方法:
1 public interface ReadWriteLock { 2 /** 3 * Returns the lock used for reading. 4 * 5 * @return the lock used for reading. 6 */ 7 Lock readLock(); 8 9 /** 10 * Returns the lock used for writing. 11 * 12 * @return the lock used for writing. 13 */ 14 Lock writeLock(); 15 }
一个用来获取读锁,一个用来获取写锁。也就是说将文件的读写操作分开,分成2个锁来分配给线程,从而使得多个线程可以同时进行读操作。下面的ReentrantReadWriteLock实现了ReadWriteLock接口。
(4)ReentrantReadWriteLock
ReentrantReadWriteLock里面提供了很多丰富的方法,不过最主要的有两个方法:readLock()和writeLock()用来获取读锁和写锁。
下面通过几个例子来看一下ReentrantReadWriteLock具体用法。
假如有多个线程要同时进行读操作的话,先看一下synchronized达到的效果:
1 public class Test { 2 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 3 4 public static void main(String[] args) { 5 final Test test = new Test(); 6 7 new Thread(){ 8 public void run() { 9 test.get(Thread.currentThread()); 10 }; 11 }.start(); 12 13 new Thread(){ 14 public void run() { 15 test.get(Thread.currentThread()); 16 }; 17 }.start(); 18 19 } 20 21 public synchronized void get(Thread thread) { 22 long start = System.currentTimeMillis(); 23 while(System.currentTimeMillis() - start <= 1) { 24 System.out.println(thread.getName()+"正在进行读操作"); 25 } 26 System.out.println(thread.getName()+"读操作完毕"); 27 } 28 }
这段程序的输出结果会是,直到thread1执行完读操作之后,才会打印thread2执行读操作的信息。
Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0读操作完毕 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1读操作完毕
而改成用读写锁的话:
1 public class Test { 2 private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 3 4 public static void main(String[] args) { 5 final Test test = new Test(); 6 7 new Thread(){ 8 public void run() { 9 test.get(Thread.currentThread()); 10 }; 11 }.start(); 12 13 new Thread(){ 14 public void run() { 15 test.get(Thread.currentThread()); 16 }; 17 }.start(); 18 19 } 20 21 public void get(Thread thread) { 22 rwl.readLock().lock(); 23 try { 24 long start = System.currentTimeMillis(); 25 26 while(System.currentTimeMillis() - start <= 1) { 27 System.out.println(thread.getName()+"正在进行读操作"); 28 } 29 System.out.println(thread.getName()+"读操作完毕"); 30 } finally { 31 rwl.readLock().unlock(); 32 } 33 } 34 }
此时打印的结果为:
Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0正在进行读操作 Thread-1正在进行读操作 Thread-0读操作完毕 Thread-1读操作完毕
说明thread1和thread2在同时进行读操作。
这样就大大提升了读操作的效率。
不过要注意的是,如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
关于ReentrantReadWriteLock类中的其他方法感兴趣的朋友可以自行查阅API文档。
Lock和synchronized的选择
总结来说,Lock和synchronized有以下几点不同:
1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
4)通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
5)Lock可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
4、原子操作类详解部分
这个包里面提供了一组原子变量类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。
java.util.concurrent.atomic中的类可以分成4组:
- 标量类(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
- 数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
- 更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
- 复合变量类:AtomicMarkableReference,AtomicStampedReference
第一组AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用 synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。
如AtomicInteger的实现片断为:
1 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2 private volatile int value; 3 public final int get() { 4 return value; 5 } 6 public final void set(int newValue) { 7 value = newValue; 8 } 9 public final boolean compareAndSet(int expect, int update) { 10 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 11 }
- 构造函数(两个构造函数)
- 默认的构造函数:初始化的数据分别是false,0,0,null
- 带参构造函数:参数为初始化的数据
- set( )和get( )方法:可以原子地设定和获取atomic的数据。类似于volatile,保证数据会在主存中设置或读取
- void set()和void lazySet():set设置为给定值,直接修改原始值;lazySet延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。
- getAndSet( )方法
- 原子的将变量设定为新数据,同时返回先前的旧数据
- 其本质是get( )操作,然后做set( )操作。尽管这2个操作都是atomic,但是他们合并在一起的时候,就不是atomic。在Java的源程序的级别上,如果不依赖synchronized的机制来完成这个工作,是不可能的。只有依靠native方法才可以。
1 public final int getAndSet(int newValue) { 2 for (;;) { 3 int current = get(); 4 if (compareAndSet(current, newValue)) 5 return current; 6 } 7 }
- compareAndSet( ) 和weakCompareAndSet( )方法
- 这两个方法都是conditional modifier方法。这2个方法接受2个参数,一个是期望数据(expected),一个是新数据(new);如果atomic里面的数据和期望数据一致,则将新数据设定给atomic的数据,返回true,表明成功;否则就不设定,并返回false。JSR规范中说:以原子方式读取和有条件地写入变量但不 创建任何 happen-before 排序,因此不提供与除weakCompareAndSet目标外任何变量以前或后续读取或写入操作有关的任何保证。大意就是说调用weakCompareAndSet时并不能保证不存在happen- before的发生(也就是可能存在指令重排序导致此操作失败)。但是从Java源码来看,其实此方法并没有实现JSR规范的要求,最后效果和 compareAndSet是等效的,都调用了unsafe.compareAndSwapInt()完成操作。
1 public final boolean compareAndSet(int expect, int update) { 2 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 3 } 4 public final boolean weakCompareAndSet(int expect, int update) { 5 return unsafe.compareAndSwapInt(this, valueOffset, expect, update); 6 }
- 对于 AtomicInteger、AtomicLong还提供了一些特别的方法。
getAndIncrement( ):以原子方式将当前值加 1,相当于线程安全的i++操作。
incrementAndGet( ):以原子方式将当前值加 1, 相当于线程安全的++i操作。
getAndDecrement( ):以原子方式将当前值减 1, 相当于线程安全的i--操作。
decrementAndGet ( ):以原子方式将当前值减 1,相当于线程安全的--i操作。
addAndGet( ): 以原子方式将给定值与当前值相加, 实际上就是等于线程安全的i =i+delta操作。
getAndAdd( ):以原子方式将给定值与当前值相加, 相当于线程安全的t=i;i+=delta;return t;操作。
以实现一些加法,减法原子操作。(注意 --i、++i不是原子操作,其中包含有3个操作步骤:第一步,读取i;第二步,加1或减1;第三步:写回内存)
使用AtomicReference创建线程安全的堆栈
1 package thread; 2 import java.util.concurrent.atomic.AtomicReference; 3 public class ConcurrentStack<T> { 4 private AtomicReference<Node<T>> stacks = new AtomicReference<Node<T>>(); 5 public T push(T e) { 6 Node<T> oldNode, newNode; 7 for (;;) { // 这里的处理非常的特别,也是必须如此的。 8 oldNode = stacks.get(); 9 newNode = new Node<T>(e, oldNode); 10 if (stacks.compareAndSet(oldNode, newNode)) { 11 return e; 12 } 13 } 14 } 15 public T pop() { 16 Node<T> oldNode, newNode; 17 for (;;) { 18 oldNode = stacks.get(); 19 newNode = oldNode.next; 20 if (stacks.compareAndSet(oldNode, newNode)) { 21 return oldNode.object; 22 } 23 } 24 } 25 private static final class Node<T> { 26 private T object; 27 private Node<T> next; 28 private Node(T object, Node<T> next) { 29 this.object = object; 30 this.next = next; 31 } 32 } 33 }
虽然原子的标量类扩展了Number类,但并没有扩展一些基本类型的包装类,如Integer或Long,事实上他们也不能扩展:基本类型的包装类是不可以修改的,而原子变量类是可以修改的。在原子变量类中没有重新定义hashCode或equals方法,每个实例都是不同的,他们也不宜用做基于散列容器中的键值。
第二组AtomicIntegerArray,AtomicLongArray还有AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。
他们内部并不是像AtomicInteger一样维持一个valatile变量,而是全部由native方法实现,如下
AtomicIntegerArray的实现片断:
1 private static final Unsafe unsafe = Unsafe.getUnsafe(); 2 private static final int base = unsafe.arrayBaseOffset(int[].class); 3 private static final int scale = unsafe.arrayIndexScale(int[].class); 4 private final int[] array; 5 public final int get(int i) { 6 return unsafe.getIntVolatile(array, rawIndex(i)); 7 } 8 public final void set(int i, int newValue) { 9 unsafe.putIntVolatile(array, rawIndex(i), newValue); 10 }
第三组AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定 volatile
字段进行原子更新。API非常简单,但是也是有一些约束:
(1)字段必须是volatile类型的
(2)字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说 调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。
(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。
(5)对于AtomicIntegerFieldUpdater 和AtomicLongFieldUpdater 只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater 。
netty5.0中类ChannelOutboundBuffer统计发送的字节总数,由于使用volatile变量已经不能满足,所以使用AtomicIntegerFieldUpdater 来实现的,看下面代码:
1 //定义 2 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = 3 AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); 4 5 private volatile long totalPendingSize; 6 7 8 9 //使用 10 long oldValue = totalPendingSize; 11 long newWriteBufferSize = oldValue + size; 12 while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) { 13 oldValue = totalPendingSize; 14 newWriteBufferSize = oldValue + size; 15 }
关于线程池会在接下来的文章中详细描述
参考资料
- http://ifeve.com/concurrenthashmap
- 《java并发编程》
以上是关于java基础Java并发包的主要内容,如果未能解决你的问题,请参考以下文章
Java并发包下锁学习第二篇Java并发基础框架-队列同步器介绍