阻塞队列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞队列相关的知识,希望对你有一定的参考价值。
参考技术A 阻塞队列常用于生产者和消费者的场景,生产者就是往队列中放入元素,消费者就是从队列中获取元素,阻塞队列就是生产者存放元素的容器,而消费者也从该容器中拿元素。阻塞队列有两种常见的阻塞场景,满足这两种阻塞场景的队列就是阻塞队列,分别如下:
Java中提供了7个阻塞队列,分别如下:
ArrayBlockingQueue和LinkedBlockingQueue一般为常用的阻塞队列。
接下来通过一个Demo演示阻塞队列的用法。
这里维护了一个ArrayBlockingQueue,并指定其大小为10,创建了一个生产者线程和一个消费者线程,生产者线程在生产5个事件后睡两秒钟,消费者线程在消费完“事件 - 5”后由于从队列中拿不到元素,就会自动阻塞,等待生产者往队列中放入元素,只要队列中有生产者放入元素,就会立即唤醒消费者线程继续获取元素,详见以下Log:
下面通过分析ArrayBlockingQueue的原理加深对阻塞队列的理解。
在生产者消费模型中,生产数据和消费数据的速率不一致,如果生产数据速度快一些,消费不过来,就会导致数据丢失,这时候我们就可以使用阻塞队列来解决这个问题。
阻塞队列是一个队列,我们使用单线程生产数据,使用多线程消费数据。由于阻塞队列的特点:队列为空的时候消费者端阻塞,队列满的时候生产者端阻塞。多线程消费数据起到了加速消费的作用,使得生产的数据不会在队列里积压过多,而生产的数据也不会丢失处理。
阻塞队列和线程池原理
参考技术A 说阻塞队列之前先要明白什么是队列?队列是一种特殊的线性表,在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。什么是阻塞队列
(1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
(2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
(3)抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queuefull")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
·一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
·超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
常用阻塞队列
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。
所谓有界队列就是阻塞队列长度优先,如果放满那么就阻塞住,不允许往队列里面插入,无界队列就是放入的元素不加限制,但是一旦超过系统警戒值,那么JVM直接帮你干掉。同时记住非常重要的一点,无界队列也会发生阻塞,因为取元素时,如果队列为空,那么消费者也会阻塞对应的就是take。
ArrayBlockingQueue:数组实现的队列,默认不保证公平访问原则,公平访问原则,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置
是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
多了tryTransfer和transfer方法,
(1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
(2)tryTransfer方法
tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
ThreadPoolExecutor的类关系
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。ExecutorService接口继承了Executor,在其上做了一些shutdown()、submit()的扩展,可以说是真正的线程池接口;AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。ScheduledExecutorService接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用
keepAliveTime的时间单位
workQueue必须是BlockingQueue阻塞队列。当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待。通过workQueue,线程池实现了阻塞功能。
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy 这个用大白话解释如下
AbortPolicy:直接扔出异常 (开始骂人)
CallerRunsPolicy:用调用者所在的线程来执行任务(既然你没事喜欢让我干活,那行我没时间,你自己干)
DiscardOldestPolicy:丢弃最老的任务,丢弃最先进入队列的任务,并执行当前任务(丢弃旧爱,搂新欢)
DiscardPolicy:直接丢弃任务(直男 拒绝新欢)
线程池的工作机制
1.当前线程少于核心线程数(corePoolSize), 直接创建线程
2.如果当前线程大于核心线程 ,那么再来任务直接加入队列
3.如果队列满了,又有新的任务,maximumPoolSize大于0,那么最多能创建线程数maximumPoolSize减去corePoolSize
4.如果当前线程数为maximumPoolSize ,再来任务时候 拒绝策略开始工作
提交任务
execute()这个方法执行后不返回执行结果,无法判断线程池是否执行了该任务
submit()这个方法会返回个future,根据future的get()获取是否被执行,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
关闭线程池
关闭线程池可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断 所有没有正在执行任务的线程 。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,任务分为,IO密集型、CPU密集型、混合型
CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。即使当计算密集型的线程 偶尔由于页缺失故障或者其他原因暂停时,这个“额外” 的线程也能确保CPU 的时装周期不会被浪费。(页缺失(英语:Page fault,又名硬错误、硬中断、分页错误、寻页缺失、缺页中断、页故障等)指的是当软件试图访问已映射在 虚拟 地址空间 中,但是目前并未被加载在 物理内存 中的一个 分页 时,由 中央处理器 的 内存管理单元 所发出的 中断 。)
由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
以上是关于阻塞队列的主要内容,如果未能解决你的问题,请参考以下文章