JUC系列Executor框架之线程池执行器
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之线程池执行器相关的知识,希望对你有一定的参考价值。
JUC系列之线程池执行器
文章目录
- JUC系列之线程池执行器
工厂类Executors
可以由工程类Executors创建不同的线程池执行器。
ThreadPoolExecutor
核心构造函数
当核心线程数量小于0时,或最大线程数量不大于0,或最大线程数小于核心线程数量,或当线程数超过corePoolSize时,线程空闲存活时长小于0时,将抛出IllegalArgumentException
。
当没有任务队列,或没有线程工厂,或没有拒绝策略时,将抛出NullPointerException
。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
参数说明
- corePoolSize:线程池中的核心线程数量
- maximumPoolSize:线程池中最大线程数量
- keepAliveTime:当线程数超过corePoolSize时,线程空闲存活时长
- unit:时间单位
- workQueue:任务队列,被提交但是尚未执行的任务的存放处
- threadFactory:线程工厂,用户创建线程,一般默认
- handler:拒绝策略,任务过多或者其他原因导致线程池无法处理任务的拒绝策略。
默认线程工程方法
工厂的默认创建线程的方法
static class DefaultThreadFactory implements ThreadFactory
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
// 线程名的规则:pool-线程池数量-thread-线程数量
DefaultThreadFactory()
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
// 创建一个非守护型新线程,线程的优先级为一般
public Thread newThread(Runnable r)
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
CachedThreadPool
Executors中有两个方法支持创建CachedThreadPool类型的线程池。
corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE。这代表线程数最大不受限。
keepAliveTime为60L,表示当线程空闲超过60s之后,就会终止销毁。
队列使用的是SynchronousQueue,是一个无容量的阻塞队列。它的内部同时只能够容纳单个元素。每一个put操作需要等待一个take操作,否则不能继续添加元素。当提交任务的速度高于线程池中处理任务的速度,CachedThreadPool会不停的创建新线程,这样会占用过多的CPU和内存资源导致服务器无法正常使用。
public static ExecutorService newCachedThreadPool()
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory
并实现newThread(Runnable r)
来自定义创建线程的方法。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
CachedThreadPool的execute运行示意图
FixedThreadPool
Executors中有两个方法支持创建FixedThreadPool类型的线程池。
corePoolSize和maximumPoolSize均采用参数nThreads的值;keepAliveTime为0L,表示当线程空闲之后,就会立即终止销毁。
public static ExecutorService newFixedThreadPool(int nThreads)
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory
并实现newThread(Runnable r)
来自定义创建线程的方法。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
队列使用的是LinkedBlockingQueue,链表的默认和最大长度是 Integer.MAX_VALUE。使用无界队列作为工作队列会对线程池带来以下影响。
- 当线程池中的线程数达到corePoolSize后,新的任务将会在无界队列中等待。
- 由于
corePoolSize
和maximumPoolSize
均采用参数nThreads的值,因此线程池中的线程数不会超过nThreads数量。 - 由于LinkedBlockingQueue是无界队列,运行中的FixedThreadPool(未执行方法shutdown()和shutdownNow())不会拒绝任务,不去调用
RejectedExecutionHandler.rejectedExecution
。
FixedThreadPool的execute运行示意图
SingleThreadExecutor
这是一个单个worker线程的Executor,Executors中有两个方法支持创建FixedThreadPool类型的线程池。
corePoolSize和maximumPoolSize均为1;keepAliveTime为0L,表示当线程空闲之后,就会立即终止销毁。
工作队列同样使用的是LinkedBlockingQueue。
public static ExecutorService newSingleThreadExecutor()
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory
并实现newThread(Runnable r)
来自定义创建线程的方法。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
SingleThreadExecutor的execute运行示意图
相关源码
成员变量
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
// 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止
// 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态)
// 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock
// 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行
// 简单理解,termination.await() 会将线程阻塞在这里
// termination.signAll() 会将阻塞在这里的线程依次唤醒
private final ReentrantLock mainLock = new ReentrantLock();
// 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount
private long completedTaskCount;
// 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
// 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
// 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
// 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
private volatile ThreadFactory threadFactory;
// 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以
private volatile boolean allowCoreThreadTimeOut;
// 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
内部状态
采用一个Integer的变量ctl,将其高3位表示线程运行状态,第29位表示线程池中的线程数量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 位数29
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 线程池容量 */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS; // 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
private static final int STOP = 1 << COUNT_BITS; // 该状态的线程池不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
private static final int TIDYING = 2 << COUNT_BITS; // 收到终止命令,进行线程池中任务整理状态;
private static final int TERMINATED = 3 << COUNT_BITS; // 线程池中所有的任务都已经终止;
// Packing and unpacking ctl
/** 获取当前线程池运行状态*/
private static int runStateOf(int c) return c & ~CAPACITY;
/** 获取当前线程池线程数量*/
private static int workerCountOf(int c) return c & CAPACITY;
/** 用在重置当前线程池 ctl 值的时候会用到
* rs:代表线程池状态 wc:代表当前线程池 worker(线程)数量
*
*
*/
private static int ctlOf(int rs, int wc) return rs | wc;
// 比较当前线程池ctl所表示的状态,是否小于某个状态s
// c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true
// 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s)
return c < s;
// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s)
return c >= s;
// 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0
private static boolean isRunning(int c)
return c < SHUTDOWN;
// 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false
private boolean compareAndIncrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect + 1);
// 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false
private boolean compareAndDecrementWorkerCount(int expect)
return ctl.compareAndSet(expect, expect - 1);
// 将ctl 的值减-1,这个方法一定成功
private void decrementWorkerCount()
// 这里会一直进行重试,直到成功为止
do while (! compareAndDecrementWorkerCount(ctl.get()));
32 31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1 | 位的序号
-------------------------------------------------------------------------------------- |
1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | RUNNING默认值
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | SHUTDOWN默认值
0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | STOP默认值
0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | TIDYING默认值
0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 | TERMINATED默认值
|
0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 | CAPACITY
内部类Worker
worker采用了AQS的独占模式。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
private static final long serialVersionUID = 6138294804551838833L;
/** Worker 内部封装的工作线程 */
final Thread thread;
/** 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务。初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行。当新建一个线程时,会优先将当前的任务设为此线程的第一执行任务,之后才会考虑阻塞队列中的其他任务。*/
Runnable firstTask;
/** 记录当前所完成的任务数量 */
volatile long completedTasks;
/**
* firstTask 可以为 null,为null启动后会到 queue中获取
*/
Worker(Runnable firstTask)
// 设置 AQS 独占模式为初始化状态,不能被抢占锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口
this.thread = getThreadFactory().newThread(this);
// 当worker 启动的时候,会执行run()方法
public void run()
runWorker(this);
// 判断当前worker的独占锁是否被独占
// 0 表示未被占用
// 1 表示已经占用
protected boolean isHeldExclusively()
return getState() != 0;
protected boolean tryAcquire(int unused)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
return true;
return false;
protected boolean tryRelease(int unused)
setExclusiveOwnerThread(null);
setState(0);
return true;
public void lock() acquire(1);
public boolean tryLock() return tryAcquire(1);
public以上是关于JUC系列Executor框架之线程池执行器的主要内容,如果未能解决你的问题,请参考以下文章
JUC系列Executor框架之CompletionService