内存模型的相关概念
算计机在执行程序时,每条指令都是在CPU中执行的,而执行指令过程中势必涉及到数据的读取和写入。当程序在运行时,会将运算需要的数据从主存复制一份到CPU的高速缓存,CPU进行计算时就可以直接从他的高速缓存读取数据和向其中写入数据,当运算结束后,在将高速缓存中的数据刷新到主存当中。
每个线程会有自己的高速缓存,当我们调用两个线程执行: i=i+1;
我们希望执行过后 i=2,而事实并不一定如此,初始时两个线程分别读取 i 的值到各自的高速缓存,线程1加1然后把最新值 1 写入到内存,此时线程2中的 i 还是 0 ,加一操作后 i 值 为 1,然后把线程 2 中的 i 写入到 内存。
这就是著名的缓存一致性问题。通常这种被多线程访问的变量成为共享变量。
并发编程中的三大概念
1、原子性
银行账户转账问题
从A中取出钱,向B中存入钱,必须保证两个操作要么全部执行,要么全不执行。
2、可见性(volatile可以保证可见性)
当多个线程访问同一个变量时,一个线程修改了这变量的值,其他线程能够及时看到修改的值。
3、有序性
即程序执行的顺序按照代码先后顺序执行。
Java内存模型中,允许编译器和处理器对指令进行重排序,但是只会重排数据无关的语句,保证单线程内的运行结果一致。
另外Java内存模型具有一些先天的有序性, happens-before 原则。
1)程序次序规则,一个线程内,按照代码顺序,书写在前面的操作现行发生于书写在后面的操作
2)锁定规则:一个unLock操作先行发生于同一个锁的lock操作
3)volatile变量规则:对于一个的写操作先行发生于后面对这个变量的读操作。
4)传递规则:A先于B,B先于C,则A先于C。
5)线程启动规则:Thread对象的start()方法先行发生于此线程的每一个动作。
6)线程中断规则:线程 interrupt() 方法的调用先行发生于被中断线程的代码检测到中断事件的发生。
7)线程终结规则:线程中所有的操作都先行发生与线程的终止检测,我们可以通过Thread.join()方法时结束、Thread.isAlive() 的返回值手段检测到线程已经终止执行。
8)对象终结规则,一个对象的初始化完成先行发生于他的 finalize() 方法的开始。
1、Synchronized 修饰
2、volatile实现同步 (只能保证可见性,不能保证原子性)
1)使用volatile关键字会强制将修改的缓存值立即写入主存。
2)使用volatile关键字,当线程2进行修改时,会导致线程1的工作内存中变量缓存无效,然后线程1读取时发现自己的缓存无效他会等待缓存行对应的主存地址被更新之后,然后去主存读取最新信息。
3)禁止指令重排序
(1)当程序执行到volatile变量的读操作或者写操作时,其前面的操作的更改肯定全部已经进 行,且结果已经对后面的操作可见,在其后面的操作还没有进行。
(2)在进行指令优化时,不能将在对volatile变量访问的语句放在其后面执行,也不能把 volatile变量后面的语句放在其前面执行。
在下列代码中: 语句3不会在 1、2之前执行,也不会在 4、5 之后执行,但 1和2的顺序, 4和5的顺序是无法保证的。 volatile 保证,执行到 3 时 1、2必定执行,且对 4、5 可见。
// x y 非volatile // z 为 volatile修饰 x = 2; //语句1 y = 0;//语句2 z = true;//语句3 x = 1;//语句4 y = -1;//语句5
使用场景:1)对变量的写操作不依赖当前值。2)该变量没有包含在具有其他变量的不变式中
也就是变量取值时值的内容独立于程序其他部分(赋值语句为原子性操作)。
3、使用局部变量ThreadLocal
(转载自:链接:https://www.zhihu.com/question/23089780/answer/62097840) ThreadLocal类用来提供线程内部的局部变量。这种变量在多线程环境下访问(通过get或set方法访问)时能保证各个线程里的变量相对独立于其他线程内的变量。ThreadLocal实例通常来说都是private static类型的,用于关联线程和线程的上下文。 可以总结为一句话:ThreadLocal的作用是提供线程内的局部变量,这种变量在线程的生命周期内起作用,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度。 举个例子,我出门需要先坐公交再做地铁,这里的坐公交和坐地铁就好比是同一个线程内的两个函数,我就是一个线程,我要完成这两个函数都需要同一个东西:公交卡(北京公交和地铁都使用公交卡),那么我为了不向这两个函数都传递公交卡这个变量(相当于不是一直带着公交卡上路),我可以这么做:将公交卡事先交给一个机构,当我需要刷卡的时候再向这个机构要公交卡(当然每次拿的都是同一张公交卡)。这样就能达到只要是我(同一个线程)需要公交卡,何时何地都能向这个机构要的目的。 有人要说了:你可以将公交卡设置为全局变量啊,这样不是也能何时何地都能取公交卡吗?但是如果有很多个人(很多个线程)呢?大家可不能都使用同一张公交卡吧(我们假设公交卡是实名认证的),这样不就乱套了嘛。现在明白了吧?这就是ThreadLocal设计的初衷:提供线程内部的局部变量,在本线程内随时随地可取,隔离其他线程。
主要方法:
1 public class ThreadLocalTest { 2 public static void main(String[] args) { 3 /** 4 * 5 * 内部属性 6 private final int threadLocalHashCode = nextHashCode(); 7 8 private static AtomicInteger nextHashCode = new AtomicInteger(); 9 10 private static final int HASH_INCREMENT = 0x61c88647; 11 12 */ 13 14 /** 15 * 构造函数为空 ,不作处理 16 * public ThreadLocal() { } 17 */ 18 ThreadLocal<Integer> threadLocal = new ThreadLocal(); 19 /** 20 * 创建一个线程局部变量ThreadLocal 21 */ 22 Integer i = 1; 23 threadLocal.set(i); 24 /** 25 * 取出线程局部变量ThreadLocal 26 */ 27 Integer j = null; 28 j =threadLocal.get(); 29 /** 30 * 解除当前线程绑定的ThreadLocal 31 */ 32 threadLocal.remove(); 33 }}
测试:
1 public class ThreadLocalTest { 2 private static final ThreadLocal<Integer> value = new ThreadLocal() { 3 @Override 4 protected Object initialValue() { 5 return 0; 6 } 7 8 }; 9 10 static class MyThread implements Runnable { 11 private int index; 12 13 public void MyThread(int index) { 14 this.index = index; 15 } 16 17 @Override 18 public void run() { 19 System.out.println("线程" + index + "的初始value:" + value.get()); 20 for (int i = 0; i < 10; i++) { 21 value.set(value.get() + 1); 22 } 23 System.out.println("线程" + index + "的累加value:" + value.get()); 24 } 25 26 } 27 28 public static void main(String[] args) { 29 for (int i = 0; i < 5; i++) { 30 new Thread(new MyThread()).start(); 31 32 } 33 } 34 }
1 结果: 2 3 线程0的初始value:0 4 线程0的初始value:0 5 线程0的累加value:10 6 线程0的累加value:10 7 线程0的初始value:0 8 线程0的初始value:0 9 线程0的初始value:0 10 线程0的累加value:10 11 线程0的累加value:10 12 线程0的累加value:10
4.原子类(AtomicInteger、AtomicBoolean……)
原子类是基本类的原子化版本,通过线程安全的方式操作,等同于自动加synchronized。
如: Integer 与AtomicInteger:
1 import java.util.concurrent.atomic.AtomicInteger; 2 public class IntegerTest { 3 static Integer i = 0; 4 static public void increase() { 5 synchronized(i){ 6 ++i; 7 i++; 8 } 9 System.out.println(i); 10 } 11 public static void main(String[] args) { 12 increase(); 13 } 14 }
1 import java.util.concurrent.atomic.AtomicInteger; 2 public class AtomicIntegerTest { 3 public static void main(String[] args) { 4 /** 5 * AtomicIntegerTest 线程安全 6 */ 7 int i = 0; 8 AtomicInteger atomicInteger = new AtomicInteger(i); 9 //相当于 ++i 10 i = atomicInteger.incrementAndGet(); 11 //相当于 i++ 12 i = atomicInteger.getAndIncrement(); 13 System.out.println(i); 14 } 15 }
AtomicInteger.compareAndSet(int expect, int update)
如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
AtomicReference
赋值操作不是线程安全的。若想不用锁来实现,可以用AtomicReference<V>这个类,实现对象引用的原子更新。
常用方法
构造函数。
java.util.concurrent.atomic.AtomicReference.AtomicReference(V initialValue)
返回当前的引用。
java.util.concurrent.atomic.AtomicReference.get()
如果当前值与给定的expect相等,(注意是引用相等而不是equals()相等),更新为指定的update值。
boolean java.util.concurrent.atomic.AtomicReference.compareAndSet(V expect, V update)
原子地设为给定值并返回旧值。
java.util.concurrent.atomic.AtomicReference.getAndSet(V newValue)
注意此方法不是原子的。不明白为什么要提供这个方法,很容易误用。
void java.util.concurrent.atomic.AtomicReference.set(V newValue)
5、使用Lock
lock: 在java.util.concurrent包内。共有三个实现:
ReentrantLock
ReentrantReadWriteLock.ReadLock
ReentrantReadWriteLock.WriteLock
主要目的是和synchronized一样, 两者都是为了解决同步问题,处理资源争端而产生的技术。功能类似但有一些区别。
区别如下:
lock更灵活,可以自由定义多把锁的枷锁解锁顺序(synchronized要按照先加的后解顺序)
提供多种加锁方案,lock 阻塞式, trylock 无阻塞式, lockInterruptily 可打断式, 还有trylock的带超时时间版本。
本质上和监视器锁(即synchronized是一样的)
能力越大,责任越大,必须控制好加锁和解锁,否则会导致灾难。
和Condition类的结合。
线程越多,性能相对更高,
ReentrantLock
可重入的意义在于持有锁的线程可以继续持有,并且要释放对等的次数后才真正释放该锁。
使用方法是:
1.先new一个实例
static ReentrantLock r=new ReentrantLock();
2.加锁
1 //加锁 2 //后者可被打断。 3 //当a线程lock后,b线程阻塞,此时如果是lockInterruptibly,那么在调用b.interrupt()之、后,b线程退出阻塞,并放弃对资源的争抢,进入catch块。 4 //(如果使用后者,必须throw interruptable exception 或catch) 5 r.lock()或r.lockInterruptibly()
3.释放锁
r.unlock()
必须做!何为必须做呢,要放在finally里面。以防止异常跳出了正常流程,导致灾难。这里补充一个小知识点,finally是可以信任的:经过测试,哪怕是发生了OutofMemoryError,finally块中的语句执行也能够得到保证。
ReentrantReadWriteLock
可重入读写锁(读写锁的一个实现)
ReentrantReadWriteLock lock = new ReentrantReadWriteLock()
ReadLock r = lock.readLock();
WriteLock w = lock.writeLock();
两者都有lock,unlock方法。写写,写读互斥;读读不互斥。可以实现并发读的高效线程安全代码
6、容器类(BlockingQueue、ConcurrentHashMap)
这里就讨论比较常用的两个:
BlockingQueue
ConcurrentHashMap
BlockingQueue
阻塞队列。该类是java.util.concurrent包下的重要类,通过对Queue的学习可以得知,这个queue是单向队列,可以在队列头添加元素和在队尾删除或取出元素。类似于一个管道,特别适用于先进先出策略的一些应用场景。普通的queue接口主要实现有PriorityQueue(优先队列),有兴趣可以研究
BlockingQueue在队列的基础上添加了多线程协作的功能:
放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
BlockingQueue成员详细介绍
1. ArrayBlockingQueue
基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug
Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
2.LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 import java.util.concurrent.atomic.AtomicInteger; 5 public class Producer implements Runnable{ 6 private BlockingQueue queue; 7 private volatile boolean isRunning = true; 8 private static AtomicInteger count = new AtomicInteger(); 9 private static final int DEFAULT_FOR_SLEEP = 1000; 10 public Producer(BlockingQueue queue) { 11 this.queue = queue; 12 } 13 @Override 14 public void run() { 15 String data = null; 16 System.out.println("启动生产者线程!"); 17 Random random = new Random(); 18 try { 19 while(isRunning) { 20 System.out.println("正在生产数据!"); 21 Thread.sleep(random.nextInt(DEFAULT_FOR_SLEEP)); 22 data = "data:"+count.incrementAndGet(); 23 System.out.println("将队列 "+data+"放入队列"); 24 if (!queue.offer(data,2,TimeUnit.SECONDS)) { 25 System.out.println("放入数据失败"+data); 26 } 27 } 28 } catch (InterruptedException e) { 29 // TODO Auto-generated catch block 30 e.printStackTrace(); 31 }finally { 32 System.out.println("退出生产者线程!"); 33 } 34 } 35 public void stop() { 36 isRunning = false; 37 } 38 }
1 import java.util.Random; 2 import java.util.concurrent.BlockingQueue; 3 import java.util.concurrent.TimeUnit; 4 5 public class Consumer implements Runnable { 6 private BlockingQueue queue; 7 private boolean isRunning = true; 8 private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; 9 10 public Consumer(BlockingQueue queue) { 11 this.queue = queue; 12 } 13 @Override 14 public void run() { 15 System.out.println("启动消费者线程!"); 16 Random random = new Random(); 17 isRunning = true; 18 try { 19 while (isRunning) { 20 System.out.println("正在从队列里获取数据"); 21 String data = (String) queue.poll(2, TimeUnit.SECONDS); 22 if (null != data) { 23 System.out.println("拿到数据 :" + data); 24 System.out.println("正在消费数据:" + data); 25 Thread.sleep(random.nextInt(DEFAULT_RANGE_FOR_SLEEP)); 26 } else { 27 // 超过2s还没数据,认为所有生产线都已经退出,自动退出消费线程 28 isRunning = false; 29 } 30 } 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 Thread.currentThread().interrupt(); 34 } finally { 35 System.out.println("退出消费者线程!"); 36 } 37 } 38 }
1 package org.clockQueue; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.Executor; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.LinkedBlockingQueue; 8 9 public class BlocakingQueueTest { 10 public static void main(String[] args) throws InterruptedException { 11 BlockingQueue<String> queue =new LinkedBlockingQueue(10); 12 Producer p1 = new Producer(queue); 13 Producer p2 = new Producer(queue); 14 Producer p3 = new Producer(queue); 15 Consumer c = new Consumer(queue); 16 17 ExecutorService service = Executors.newCachedThreadPool(); 18 //启动线程 19 service.execute(p1); 20 service.execute(p2); 21 service.execute(p3); 22 service.execute(c); 23 //运行10s 24 Thread.sleep(10*1000); 25 p1.stop(); 26 p2.stop(); 27 p3.stop(); 28 29 Thread.sleep(2000); 30 31 //退出executor 32 service.shutdown(); 33 34 35 } 36 }
1 启动生产者线程! 2 启动消费者线程! 3 启动生产者线程! 4 启动生产者线程! 5 正在生产数据! 6 正在生产数据! 7 正在生产数据! 8 正在从队列里获取数据 9 将队列 data:1放入队列 10 正在生产数据! 11 拿到数据 :data:1 12 正在消费数据:data:1 13 正在从队列里获取数据 14 将队列 data:2放入队列 15 正在生产数据! 16 拿到数据 :data:2 17 正在消费数据:data:2 18 将队列 data:3放入队列 19 正在生产数据! 20 将队列 data:4放入队列 21 正在生产数据! 22 将队列 data:5放入队列 23 正在生产数据! 24 将队列 data:6放入队列 25 正在生产数据! 26 正在从队列里获取数据 27 拿到数据 :data:3 28 正在消费数据:data:3 29 将队列 data:7放入队列 30 正在生产数据! 31 将队列 data:8放入队列 32 正在生产数据! 33 正在从队列里获取数据 34 拿到数据 :data:4 35 正在消费数据:data:4 36 将队列 data:9放入队列 37 正在生产数据! 38 正在从队列里获取数据 39 拿到数据 :data:5 40 正在消费数据:data:5 41 将队列 data:10放入队列 42 正在生产数据! 43 将队列 data:11放入队列 44 正在生产数据! 45 将队列 data:12放入队列 46 正在生产数据! 47 将队列 data:13放入队列 48 正在生产数据! 49 将队列 data:14放入队列 50 正在生产数据! 51 正在从队列里获取数据 52 拿到数据 :data:6 53 正在消费数据:data:6 54 将队列 data:15放入队列 55 正在生产数据! 56 将队列 data:16放入队列 57 正在生产数据! 58 将队列 data:17放入队列 59 将队列 data:18放入队列 60 正在从队列里获取数据 61 拿到数据 :data:7 62 正在消费数据:data:7 63 正在生产数据! 64 将队列 data:19放入队列 65 正在从队列里获取数据 66 拿到数据 :data:8 67 正在生产数据! 68 正在消费数据:data:8 69 将队列 data:20放入队列 70 将队列 data:21放入队列 71 正在从队列里获取数据 72 拿到数据 :data:9 73 正在生产数据! 74 正在消费数据:data:9 75 正在从队列里获取数据 76 拿到数据 :data:10 77 正在生产数据! 78 正在消费数据:data:10 79 正在从队列里获取数据 80 拿到数据 :data:11 81 正在生产数据! 82 正在消费数据:data:11 83 将队列 data:22放入队列 84 将队列 data:23放入队列 85 将队列 data:24放入队列 86 正在从队列里获取数据 87 拿到数据 :data:12 88 正在生产数据! 89 正在消费数据:data:12 90 正在从队列里获取数据 91 拿到数据 :data:13 92 正在生产数据! 93 正在消费数据:data:13 94 正在从队列里获取数据 95 拿到数据 :data:14 96 正在生产数据! 97 正在消费数据:data:14 98 正在从队列里获取数据 99 拿到数据 :data:15 100 正在消费数据:data:15 101 正在从队列里获取数据 102 拿到数据 :data:16 103 正在消费数据:data:16 104 正在从队列里获取数据 105 拿到数据 :data:17 106 正在消费数据:data:17 107 将队列 data:25放入队列 108 正在生产数据! 109 将队列 data:26放入队列 110 正在生产数据! 111 正在从队列里获取数据 112 拿到数据 :data:18 113 正在消费数据:data:18 114 将队列 data:27放入队列 115 正在生产数据! 116 将队列 data:28放入队列 117 正在生产数据! 118 将队列 data:29放入队列 119 正在从队列里获取数据 120 拿到数据 :data:19 121 正在生产数据! 122 正在消费数据:data:19 123 将队列 data:30放入队列 124 将队列 data:31放入队列 125 将队列 data:32放入队列 126 正在从队列里获取数据 127 拿到数据 :data:20 128 正在生产数据! 129 正在消费数据:data:20 130 正在从队列里获取数据 131 拿到数据 :data:21 132 正在生产数据! 133 正在消费数据:data:21 134 将队列 data:33放入队列 135 正在从队列里获取数据 136 拿到数据 :data:22 137 正在生产数据! 138 正在消费数据:data:22 139 将队列 data:34放入队列 140 正在从队列里获取数据 141 拿到数据 :data:23 142 正在生产数据! 143 正在消费数据:data:23 144 将队列 data:35放入队列 145 正在从队列里获取数据 146 拿到数据 :data:24 147 退出生产者线程! 148 正在消费数据:data:24 149 正在从队列里获取数据 150 拿到数据 :data:25 151 正在消费数据:data:25 152 退出生产者线程! 153 将队列 data:36放入队列 154 正在从队列里获取数据 155 拿到数据 :data:26 156 退出生产者线程! 157 正在消费数据:data:26 158 正在从队列里获取数据 159 拿到数据 :data:27 160 正在消费数据:data:27 161 正在从队列里获取数据 162 拿到数据 :data:28 163 正在消费数据:data:28 164 正在从队列里获取数据 165 拿到数据 :data:29 166 正在消费数据:data:29 167 正在从队列里获取数据 168 拿到数据 :data:30 169 正在消费数据:data:30 170 正在从队列里获取数据 171 拿到数据 :data:31 172 正在消费数据:data:31 173 正在从队列里获取数据 174 拿到数据 :data:32 175 正在消费数据:data:32 176 正在从队列里获取数据 177 拿到数据 :data:33 178 正在消费数据:data:33 179 正在从队列里获取数据 180 拿到数据 :data:34 181 正在消费数据:data:34 182 正在从队列里获取数据 183 拿到数据 :data:35 184 正在消费数据:data:35 185 正在从队列里获取数据 186 拿到数据 :data:36 187 正在消费数据:data:36 188 正在从队列里获取数据 189 退出消费者线程!
3. DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。
小结
BlockingQueue不光实现了一个完整队列所具有的基本功能,同时在多线程环境下,他还自动管理了多线间的自动等待于唤醒功能,从而使得程序员可以忽略这些细节,关注更高级的功能。
ConcurrentHashMap
高效的线程安全哈希map。请对比hashTable , concurrentHashMap, HashMap