线程池
Posted Carl_Hugo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关的知识,希望对你有一定的参考价值。
JAVA线程池的实现
使用线程池的好处
- 降低资源消耗
- 提高响应速度
- 提高线程的可管理性
Executor框架
作用:线程工厂,可以通过Executors创建特定功能的线程池。
线程池的种类
1. newFixedThreadPool(num)
固定数量的线程池,线程池中只存在核心线程,线程池中的核心线程数等于最大线程数,当有新的任务来临时不会开辟新的线程来处理任务。(LinkedBlockingQueue)。(一开始就创建num个线程)
2. newCachedThreadPool()
缓存线程池,线程池中线程数量不确定,理论上可达Integer.MAX_VALUE,线程池中线程都是非核心线程。线程空闲时间超过keepAliveTime会自动被回收(SynchronousQueue)。
3. newScheduledThreadPool(num)
任务线程池,线程池中核心线程的数量固定,对于非核心线程的数量不限制,非核心线程存在超时回收机制。主要用于执行定时任务和周期性任务的场景。
4. newSingleThreadExecutor()
单一线程池,非核心线程,主要确保任务在同一线程中顺序执行,类似于同步。(LinkedBlockingQueue)。
创建线程池的参数
ThreadPoolExecutor源码
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//线程活动保持时间
TimeUnit unit,//keepAliveTime的单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler)//拒绝策略
- corePoolSize:核心线程数,当提交一个任务到线程池中,线程池会创建一个线程来执行任务,直到线程数达到corePoolSize时不再创建,这个时候把提交的新任务放入阻塞队列,如果调用preStartAllCoreThread(),则会在创建线程池的时候初始化核心线程。
- maximunPoolSize:允许创建的最大线程数,如果阻塞队列满,同时创建的线程数小于最大线程数,就会创建新的线程来执行任务。
- keepAliveTime:非核心线程空闲时的存活时间,超过时间线程会被终止。
- unit:keepAliveTime的时间单位。
- workQueue:阻塞队列,有四种阻塞队列类型,ArrayBlockingQueue(基于数组)、LinkedBlockingQueue(基于链表)、SynchronousQueue(不存储元素)、PriorityBlockingQueue(具有优先级的);
- threadFactory:用于创建线程的线程工厂。
- handler:阻塞队列满且没有空闲线程,执行自定义处理策略。
任务添加到线程池之后的运行流程
调用submit或者execute方法。
调用submit(Runnable runTask)方法,需要传入一个实现Callable接口的对象,在任务执行完后通过Future对象获得任务的返回值。submit(Runnable runTask)内部实际上还是执行execute方法。
调用execute方法是不能获得任务执行结束后的返回值的,且不会抛出异常。
线程池存在的状态
ctl(AtomicInteger):利用低29位表示线程池中的线程数,高3位表示线程池的运行状态。
RUNNING:线程池会接受任务,并处理阻塞队列中的任务(高3位为111)。
SHUTDOWN:线程池不会接受任务,但是会处理阻塞队列中的任务(高3位为000)。
STOP:线程池不会接受新的任务,不会处理阻塞队列中的任务,中断正在执行的任务(高3位位001)。
TIDYING:过渡阶段,所有任务都执行结束,线程池中不存在有效线程,并且将调用terminated方法。(高三位010)
TERMINATED:终止状态(高三位011)
- shutdown()-线程池状态从Running转换为shutdown
- shutdownNow()-线程池状态从Running/Shutdown转换为Stop
- 阻塞队列为空,线程池为空,线程池状态从shutdown转换为Tidying
- 线程池为空,线程池状态从Stop转换为Tidying
- Terminated()-线程池状态从Tidying转换为Terminate
Executor方法
判断当前线程池中线程数量是否小于corePoolSIze,如果小于则通过addWorker方法创建一个新的Worker对象执行当前的任务。
如果当前线程池中线程数量大于corePoolSize,尝试将当前任务添加到阻塞队列中;二次检查线程池中的状态,如果线程池不在Running状态,则将刚刚添加到阻塞队列中的任务移除,同时拒绝当前任务请求;
如果二次检查发现当前线程池处于Running状态,那么会检查当前线程池中工作线程数量是否为0,如果为0,就通过addWorker创建一个Worker对象处理阻塞队列中的任务。
如果原先线程池不处于Running状态或者将当前任务添加到阻塞队列出错时,就通过addWorker创建新的Worker对象处理当前任务,如果添加失败,则拒绝当前任务。
executor方法仅仅检查当前线程池中线程数量有没有超过corePoolSize,maximumPoolSize是通过addWorker方法检测的。
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
当线程数量超过maximumPoolSize,就会调用return false;
一个线程池中线程的数量实际就是这个线程池中Worker的数量,如果Worker大小超过超过corePoolSize,那么任务都在阻塞队列中。Worker是对Java中任务的一个封装。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker(Runnable firstTask)
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
Worker实现Runnable接口,通过构造方法可知,一般传入的对象也实现了Callable或者Runnable接口(在调用submit()会将Callable对象封装成实现了Runnable接口的对象的)。
public <T> Future<T> submit(Callable<T> task)
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
Worker对象创建后会执行内部的run方法,在run方法中会执行runWorker方法,如果传入的任务不为空,则执行任务。为空则退出runWorker方法。
getTask()返回的任务为空的情况
- worker数量超过maximumPoolSize
- 线程池状态为Stop
- 线程池状态为Shutdown且阻塞队列为空
- 通过定时从阻塞队列中获取数据,但是超时后仍然未能获取。
线程池运行的4个阶段
- poolSize
线程池时如何重用线程的
线程重用是在getTask()中进行实现的。
- getTask()中存在两个for循环,它不断从阻塞队列中取出需要执行的任务,返回给runWorker()。
- runWorker()方法判断返回对象,只要不为空就执行run()处理它。直到接受到getTask()里传来的空值(阻塞队列中没有任务),这样一个线程处理完一个任务接着处理阻塞队列中的下一个任务。
- 线程池中不同线程也是可以并发处理阻塞队列中的任务的。
- 当阻塞队列中不存在任务时就会判断是否需要回收Worker对象。
线程池的关闭
shutdown() - 不会影响阻塞队列中的任务执行,但是会拒绝执行新加入的任务同时回收闲置worker。
shutdownNow() - 线程池状态切换为stop,不会处理阻塞中任务,也不会处理新加入的任务,同时回收所有worker。
自定义线程池使用详解
1. 使用有界队列
- corePoolSize-线程池实际线程数小于corePoolSize,则优先创建线程;若大于corePoolSize则会将任务加入队列。
- maximumPoolSize-如果队列已满,则在总线程数不大于maximumPoolSize的前提下创建新的线程,如果线程数大于maximumPoolSize,则执行拒绝策略。
- keepAliveTime-线程没有任务执行时继续存活的时间。
下限:corePoolSize 上限:maximumPoolSize 缓冲:有界队列
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1,//coreSize
2,//MaxSize
60,//keepAliveTime
TimeUnit.SECONDS, //时间单位
new ArrayBlockingQueue<Runnable>(3),//指定有界队列
new MyRejected()
);
2.使用无界队列
无界队列不存在入队失败的情况。(除非系统资源耗尽)
- corePoolSize-当有新的线程到来时,系统线程数小于corePoolSize,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若没有空闲的线程资源,则直接进入队列等待。
下限:corePoolSize 上限:无 缓冲:有界队列
ExecutorService executor = new ThreadPoolExecutor(
5,//core
10,//max
120L,//2min
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),);
以上是关于线程池的主要内容,如果未能解决你的问题,请参考以下文章