Java Executor源码解析—ThreadPoolExecutor线程池的介绍和基本属性一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Executor源码解析—ThreadPoolExecutor线程池的介绍和基本属性一万字相关的知识,希望对你有一定的参考价值。
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的基本属性和构造器!
文章目录
- 1 ThreadPoolExecutor的概述
- 2 ThreadPoolExecutor的主要属性
- 3 ThreadPoolExecutor的构造器
1 ThreadPoolExecutor的概述
public class ThreadPoolExecutor
extends AbstractExecutorService
JDK线程池的关键实现,我们常用的Executors中的预定义线程池就有这个类的实例,当然也可以通过该类的构造器来创建一个自定义的线程池,提供任务执行,线程调度,线程池管理等等服务,理解了这个类,其他线程池的实现原理就很好理解了。
CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy是四个拒绝策略类。对于正在执行的线程数大于等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,任务会交给RejectedExecutionHandler来处理。
Worker是对线程池中工作线程的包装,用于对于传递的任务或者任务队列中的任务进行执行、中断控制等操作。这其中还涉及到AQS,即Worker实现了简单的不可重入锁。
2 ThreadPoolExecutor的主要属性
使用一个ctl原子变量来来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0。
另外还具有一个ReentrantLock独占锁,当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候,因为这涉及到多个工作线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才能进行操作。
其他的属性将会在后面的构造器部分一一详解。
/**
* 用来同时记录线程池的运行状态(runState,简称rs)和线程池中线程数量(workerCount,简称wc)。ctl的值使用AtomicInteger原子类包装,能够保证数据是线程安全的。
* int类型转换为二进制之后的最高三位保存线程池的状态,低29位保存线程数量。刚初始化ctl的时候,rs为RUNNING状态,wc为0
* ReentrantReadWriteLock也是使用一个state变量保存写锁和读锁的获取信息,ConcurrentHashMap中的甚至使用一个lockState保存三种锁状态
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 线程数量掩码位数,int类型长度-3后的剩余位数,即wc所占位数为29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 为了能正确保存线程数量,线程池的数量线程被限制为29位的最大值,即最大(2^29)-1个,而不是(2^31)-1个
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//ThreadPoolExecutor中定义了线程池的状态,存储在ctl的高三位中,一共有五种
/**
* RUNNING状态,11100000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* SHUTDOWN状态:00000000000000000000000000000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* STOP状态:00100000000000000000000000000000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* TIDYING状态:01000000000000000000000000000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* TERMINATED状态:01100000000000000000000000000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c)
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c)
//将c的高3位置为0并返回结果
return c & CAPACITY;
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc)
//两者与运算
return rs | wc;
/**
* 线程任务队列,是一个阻塞队列
* 使用isEmpty来检测队列是否为空,而不是通过poll的返回值
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 当改变线程池状态,比如添加工作线程、停止线程池,或者访问线程池共享参数信息比如当前线程数量的时候
* 因为这涉及到多个线程之间的共享信息比如线程池状态、工作线程数量等参数的同步,需要获取mainLock独占锁才行
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 包含全部工作线程的set集合
* 只有在持有mainLock锁的时候才能访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 主要是为了支持awaitTermination方法,外部线程调用awaitTermination方法之后
* 会判断线程池是否是TERMINATED状态,即终止状态,如果不是则调用线程在termination条件变量中等待,直到超时或者线程完毕
*/
private final Condition termination = mainLock.newCondition();
/**
* 记录到目前为止线程池中的拥有的最大线程数量
* 只有在持有mainLock锁的时候才能访问
*/
private int largestPoolSize;
/**
* 线程池已完成的任务数量,只有在某个Worker工作线程终止时才会更新
* 只有在持有mainLock锁的时候才能访问
*/
private long completedTaskCount;
/**
* ThreadPoolExecutor中的工作线程统一使用线程工厂来创建
* threadFactory用于保存传入的线程工厂实现,具有volatile的特性
* Executors中给出了默认实现,我们可以直接使用:Executors.defaultThreadFactory()
*/
private volatile ThreadFactory threadFactory;
/**
* 对于提交任务数超过maxmumPoolSize+workQueue之和时超出的任务,或者线程池正在关闭时的新提交的任务执行的拒绝策略,
* 任务会交给RejectedExecutionHandler的handler来处理,具有volatile的特性
* ThreadPoolExecutor中提供了默认实现:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲的工作线程的等待超时时间,具有volatile的特性
* 当存在的工作线程数量大于指定核心线程数量时,那么多余的线程会使用此超时时间,超过该时间没有工作则关闭线程
* 或者如果允许CoreThreadTimeOut,那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心将永远等待新的任务。
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程应用空闲超时时间,具有volatile的特性
* 如果为false,那么即使核心线程空闲也会永远保持活动状态(不会被销毁)
* 如果为true,那么核心线程将会应用 keepAliveTime,在指定时间内等待工作,超时则被销毁(设置成功的要求是超时时间大于0)。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池核心线程数量,具有volatile的特性
* 除非设置了allowCoreThreadTimeOut=true,那么核心线程永远不会被销毁
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数量,具有volatile的特性
* 不能超过(2^29)-1
*/
private volatile int maximumPoolSize;
//下面的属性涉及到Java安全模型:https://developer.ibm.com/zh/articles/j-lo-javasecurity/
//一般人接触不到
/**
* 用于校验是否具有shutdown 和 shutdownNow 关闭线程池的操作权限
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* 授权上下文环境对象
* 在GC标记ThreadPoolExecutor对象并调用finalize方法时调用,用于释放资源
*/
private final AccessControlContext acc;
2.1 ctl相关方法
围绕着ctl有一系列的简单的方法,它们被其他大量方法所调用,这里先介绍出来!
runStateOf(int c):获取ctl的高3位,即获取线程池运行状态值;
workerCountOf(int c):获取ctl的低29位,即获取线程数量值;
ctlOf(int rs, int wc):组合rs和wc,计算ctl新值;
runStateLessThan(int c, int s):c的运行状态值是否小于指定状态值s;
runStateAtLeast(int c, int s):c的运行状态值是否大于等于指定状态值s;
isRunning(int c):c的运行状态是否是RUNNING;
compareAndIncrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自增1;
compareAndDecrementWorkerCount(int expect):尝试CAS的将ctl的WorkerCount线程数量部分自减1;
decrementWorkerCount():循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。只有在addWorkerFailed、processWorkerExit以及getTask方法中调用。
//通过对ctl的拆解、组合,获取相关的数据
/**
* 获取ctl的高3位,线程池运行状态
*
* @param c 此时的ctl值
* @return ctl的高3位的int值
*/
private static int runStateOf(int c)
//将c的低29位置为0并返回结果
return c & ~CAPACITY;
/**
* 获取ctl的低29位,线程数量
*
* @param c 此时的ctl值
* @return ctl的低29位的int值
*/
private static int workerCountOf(int c)
//将c的高3位置为0并返回结果
return c & CAPACITY;
/**
* 组合rs和wc,计算ctl新值
*
* @param rs 运行状态
* @param wc 线程数量
* @return 新ctl的int值
*/
private static int ctlOf(int rs, int wc)
//两者与运算
return rs | wc;
//不需要拆解ctl进行数据比较或者更改
/**
* 运行状态值是否小于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateLessThan(int c, int s)
return c < s;
/**
* 运行状态值是否大于等于指定状态值
*
* @param c 此时的ctl
* @param s 指定状态值
* @return true 是 false 否
*/
private static boolean runStateAtLeast(int c, int s)
return c >= s;
/**
* 运行状态是否是RUNNING
*
* @param c 此时的ctl
* @return true 是 false 否
*/
private static boolean isRunning(int c)
return c < SHUTDOWN;
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自增1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndIncrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect + 1);
/**
* 尝试CAS的将ctl的WorkerCount线程数量部分自减1
*
* @param expect 预期的ctl值
* @return true 成功 false 失败
*/
private boolean compareAndDecrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect - 1);
/**
* 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止
* 只有在addWorkerFailed、processWorkerExit以及getTask部分调用
*/
private void decrementWorkerCount()
do
while (!compareAndDecrementWorkerCount(ctl.get()));
2.2 线程池的状态
在上面的属性介绍中,我们知道线程池有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。
RUNNING = -1 << COUNT_BITS,转换为二进制就是11100000000000000000000000000000
SHUTDOWN = 0 << COUNT_BITS,转换为二进制就是00000000000000000000000000000000
STOP = 1 << COUNT_BITS,转换为二进制就是00100000000000000000000000000000
TIDYING = 2 << COUNT_BITS,转换为二进制就是01000000000000000000000000000000
TERMINATED = 3 << COUNT_BITS,转换为二进制就是01100000000000000000000000000000
可以发现,运行状态的大小关系为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,这在状态转换的时候非常有用,这样可以通过大小判断状态关系。
类似于线程的状态,线程池的状态也可以转换。但是又有不同,线程状态可以循环转换、相互转换,而一旦发生线程池的状态的转换,那么该转换不可逆。下面来看看线程池状态的转换规则:
详细说明:
状态名 | 说明 | 转换 |
RUNNING | 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 | 新建的线程池的初始状态就是RUNNING,并且线程池中的工作线程数量为0。 |
SHUTDOWN | 线程池处在SHUTDOWN状态时,不接收新任务,但内部正在执行的任务和队列里等待的任务,会执行完,随后会清理全部工作线程。 | RUNNING状态的线程池,调用shutdown方法,或者隐式调用了finalize方法(里面有shutdown方法时线程池状态将变成SHUTDOWN。 |
STOP | 线程池处在STOP状态时,不接收新任务,不处理已添加的任务(丢弃),并且会中断正在处理的任务,随后会清理全部工作线程。 | RUNNING or SHUTDOWN状态的线程池,调用shutdownNow方法,线程池状态将变成 STOP。 |
TIDYING | 所有的任务已执行完或被终止或被丢弃,ctl记录的workerCount工作线程数量为0,线程池会变为TIDYING状态。接着会执行钩子函数terminated()。 | SHUTDOWN状态的线程池,当任务队列为空并且线程池工作线程数workerCount为0时,线程池状态就会由 SHUTDOWN 自动转换为 TIDYING状态。 STOP状态的线程池,线程池中工作线程数workerCount为0时,线程池状态就会由STOP自动转换为TIDYING状态。 |
TERMINATED | 钩子函数terminated()执行完毕,就变成TERMINATED状态,线程池彻底终止。 | TIDYING状态的线程池,在接着执行完terminated()之后,线程池状态就会由TIDYING自动转换为 TERMINATED。 |
3 ThreadPoolExecutor的构造器
ThreadPoolExecutor的构造器是创建线程池的入口,JDK提供了四个构造函数。其中参数较少的的三个构造函数内部都是调用参数最多的那一个构造函数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue< Runnable > workQueue)
使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂及默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory)
使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的拒绝策略创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
//内部调用最多参数的构造器
//拒绝策略传递的默认实现:defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,RejectedExecutionHandler handler)
使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
/**
* 使用给定的初始参数和默认的线程工厂创建新的 ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
//内部调用最多参数的构造器
//线程工厂传递的Executors的默认实现:Executors.defaultThreadFactory()
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue< Runnable > workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
最后一个构造器,使用给定的初始参数创建新的 ThreadPoolExecutor,一共有7个参数。
/**
1. 使用给定的初始参数创建新的 ThreadPoolExecutor。
2. 3. @param corePoolSize 核心线程数
3. @param maximumPoolSize 最大线程数
4. @param keepAliveTime 空闲线程等待超时时间
5. @param unit 超时时间单位
6. @param workQueue 阻塞任务队列
7. @param threadFactory 线程工厂
8. @param handler 拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
//一系列参数校验
/*
* 如果核心线程数小于0
* 或者 如果最大线程数小于等于0
* 或者 如果最大线程数小于核心线程数
* 或者 如果空闲线程等待超时时间小于0
*
* 满足上面一项,都将抛出IllegalArgumentException异常
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
/*
* 如果阻塞任务队列为null
* 或者 如果线程工厂为null
* 或者 如果拒绝策略为null
*
* 满足上面一项,都将抛出NullPointerException异常
*/
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//初始化用于安全管理器的上下文参数
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
//初始化核心线程数
this.corePoolSize = corePoolSize;
//初始化最大线程数
this.maximumPoolSize = maximumPoolSize;
//初始化阻塞任务队列
this.workQueue = workQueue;
//初始化空闲线程等待超时时间
this.keepAliveTime = unit.toNanos(keepAliveTime);
//初始化线程工厂
this.threadFactory = threadFactory;
//初始化拒绝策略
this.handler = handler;
单从构造器来说,实际上很简单,首先对参数进行校验,随后对一些全局属性进行初始化。下面我们主要来解释构造函数中的参数的含义,首先简单的认识这些属性:
- corePoolSize:线程池核心线程数。不能小于0。
- 当提交一个任务到线程池时,如果此时线程池的线程数量小于核心线程数,那么线程池会新创建一个线程来执行任务,即使此时存在空闲线程也不例外。默认情况创建0个核心线程,如果调用了线程池的prestartAllCoreThreads()方法,线程池会立即创建并启动所有核心线程。
- maximumPoolSize:线程池最大线程数。不能小于corePoolSize,不能小于等于0。
- 当workQueue(任务队列)放不下线程任务,并且已创建的线程数小于最大线程数,则线程池会再次创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列(任务队列没有上限大小)这个参数就没什么效果。
- keepAliveTime:空闲线程等待超时时间;unit:keepAliveTime时间单位
- 当线程数量超过corePoolSize时,多余的空闲线程等待超时时间,即如果指定时间返回没有任务执行,那么该线程将被回收,直到数量减少到corePoolSize为止。
- 如果允许CoreThreadTimeOut(前提是keepAliveTime大于0),那核心线程也会使用此超时时间,超过该时间没有任务则关闭线程;否则,核心线程将永远等待新的任务。
- workQueue:阻塞任务队列。
- 当线程任务添加的速度超过所有核心线程执行速度时,新来的来不及执行的线程任务将被存放到workQueue阻塞任务队列中。
- 任务队列一定是阻塞队列,常见的有以下四种,实际上有很多种:
- ArrayBlockingQueue:有界阻塞任务队列,构造函数一定要传入具体队列容量。
- LinkedBlockingQueu:通常作为无界阻塞任务队列(构造函数不传大小会默认为Integer.MAX_VALUE ),当有大量任务提交时,容易造成内存耗尽。
- SynchronousQueue:一个没有容量的阻塞队列,会将任务同步交付给工作线程。
- PriorityBlockingQueue : 具有优先级的无界阻塞任务队列。
- threadFactor:线程工厂。
- 线程工厂用于创建工作线程,默认线程工厂:Executors.defaultThreadFactory。
- handler:拒绝策略。
- 对于正在执行的线程数等于maxmumPoolSize以及workQueue容量已满时提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
- ThreadPoolExecutor中提供了4种拒绝策略的实现:
- AbortPolicy:调用者的线程直接抛出异常,作为默认拒绝策略;
- CallerRunsPolicy:用调用者的线程执行任务;
- DiscardOldestPolicy:抛弃队列中最久的任务;
- DiscardPolicy:抛弃当前任务;
3.1. corePoolSize、workQueue,maximumPoolSize ,keepAliveTime,unit之间关系
- 默认情况下,新的线程池不会启动任何线程。线程数量小于corePoolSize时,新提交任务都将通过线程工厂创建一个新线程去执行,即使此时线程池中存在空闲的线程。
- 当创建的线程数量达到corePoolSize时,新提交的任务会判断如果有空闲的线程那么就让空闲线程去执行,没有空闲线程时新提交的任务将被放入workQueue中,当有线程执行完当前任务,就会从任务队列中拉取任务继续执行。
- 当workQueue已满,并且maximumPoolSize>corePoolSize时,此时新提交任务又会通过线程工厂创建新线程去执行。
- 对于线程数大于等于maxmumPoolSize以及workQueue容量已满时新提交的任务,或者线程池正在关闭时的新提交的任务,线程池将会执行拒绝策略,即这些任务都直接被非线程池线程处理了。
- 如果线程池中存在超过corePoolSize的线程数并且存在空闲线程。如果空闲线程在keepAliveTime(unit表示时间单位)时间范围内都没有工作时,将清理空闲线程,减少资源浪费,直到线程数量被清理减少至核心线程数为止,预留一定数量的核心资源。而通过调用allowCoreThreadTimeOut(true)方法(设置成功的要求是超时时间大于0),核心线程也将应用超时时间规则,即此时如果没有新任务,那么所有的线程都将可能被清理。
corePoolSize、maximumPoolSize、keepAliveTime+unit、ThreadFactory、RejectedExecutionHandler都可以在线程池启动(创建)之后动态调整!通过这些参数,线程池可以动态的调整线程数量,我们也可以创建属于自己的特殊的线程池。
unit表示keepAliveTime的时间单位,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
3.2 ThreadFactory 线程工厂
线程池统一通过ThreadFactory创建新线程,可以说是工厂模式的应用。默认使用Executors.defaultThreadFactory工厂,该工厂创建的线程全部位于同一个ThreadGroup中,并且具有pool-N-thread-M的线程命名(N表示线程池工厂编号,M表示一个工厂创建的线程编号,都是自增的)和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从new Thread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
也可以通过实现ThreadFactory自定义线程工厂,案例如下:
public class ThreadPool
private static ExecutorService pool;
public static void main(String[] args)
//自定义线程池,最大线程数为3
pool = new ThreadPoolExecutor(3, 3, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
//自定义线程工厂,lambda
r ->
System.out.println("线程" + r.hashCode() + "创建");
//线程命名
return new Thread(r, "threadPool-" + r.hashCode());
, new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++)
pool.execute(new ThreadTask());
pool.shutdown();
class ThreadTask implements Runnable
@Override
public void run()
//输出执行线程的名称
System.out.println("ThreadName:" + Thread.currentThread().getName());
上面是通过匿名内部类的lambda形式创建ThreadFactory实例的,其他框架也提供了多种现成的方式:
- spring自带的的CustomizableThreadFactory。可以通过构造器和方法设置参数。
- guava的ThreadFactoryBuilder。可以通过方法的链式调用设置参数。
- commons-lang3的BasicThreadFactory。可以通过方法的链式调用设置参数
3.3 workQueue任务队列
任务队列用于存放提交的来不及执行的任务,一定是一个阻塞队列BlockingQueue,JDK自带的阻塞队列如下:
ArrayBlockingQueue | 有界阻塞队列 |
LinkedBlockingQueue | 无界阻塞队列 |
LinkedBlockingDeque | 无界阻塞双端队列 |
SynchronousQueue | 没有容量的阻塞队列 |
DelayQueue | 支持延时操作的无界阻塞队列 |
PriorityBlockingQueue | 任务具有优先级的无界阻塞队列 |
LinkedTransferQueue | JDK1.7的新无界阻塞队列 |
根据常用阻塞队列的类型,任务队列一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列,当然还有其他的类型。
3.3.1 直接提交队列
阻塞队列设置为SynchronousQueue。前面讲过,SynchronousQueue是一个特殊的BlockingQueue,它内部没有容量,每一个入队操作都需要匹配一个等待的出队操作或者等待被后来的出队操作匹配才能返回,出队操作同理。JUC—三万字的SynchronousQueue源码深度解析。
若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,如果使用SynchronousQueue队列,新任务不会被保存在任务队列中,总是会马上被执行。如果此时线程数量小于maximumPoolSize,则尝试创建新的线程执行;如果达到maximumPoolSize设置的最大值,则传递给执行拒绝策略去执行。
采用SynchronousQueue作为任务队列的线程池不能缓存任务,一个任务要么被执行要么被拒绝策略处理,这就是“直接提交”的由来。
public class SynchronousQueueThreadPool
public static void main(String[] args)
//maximumPoolSize设置为2,拒绝策略为AbortPolicy策略,直接抛出异常
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
//采用SynchronousQueue作为任务队列,不能缓存任务
new SynchronousQueue<>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//核心线程应用超时等待机制
pool以上是关于Java Executor源码解析—ThreadPoolExecutor线程池的介绍和基本属性一万字的主要内容,如果未能解决你的问题,请参考以下文章
Java Executor源码解析—Executors线程池工厂以及四大内置线程池
Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字
Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字