java同步之线程池ThreadPoolExecutor实现原理
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步之线程池ThreadPoolExecutor实现原理相关的知识,希望对你有一定的参考价值。
一般我们在java编程时为了提供程序的性能,很多时候会借助CPU多核优势,进行多线程处理,将一个大任务分给多个线程并发处理,加速处理速,而java默认提供了几种线程池实现:
Executors.newFixedThreadPool
Executors.newCachedThreadPool
Executors.newSingleThreadExecutor()
Executors.newScheduledThreadPool
而这些线程池的实现,底层大部分都是基于ThreadPoolExecutor
,我们来看下其实现:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
这是ThreadPoolExecutor
所有的构造参数,我们一个一个讲下:
- corePoolSize 核心线程数,当我们往线程池提交任务的时候,如果线程数量小于corePoolSize ,那么会新建线程处理,即使这时候其他线程是空闲的
- maximumPoolSize 线程池最大线程数量控制,maximumPoolSize >= corePoolSize , 当 corePoolSize < 运行线程数 <= maximumPoolSize 且队列没有满,这时候新提交的任务提交到队列中,如果队列满了,那么在maximumPoolSize 允许范围内新建线程。
- keepAliveTime 当线程池中活跃线程数超过corePoolSize 且线程空闲时间超过keepAliveTime,会将线程进行回收
- unit 线程活跃时间单位
- workQueue 线程池中任务队列
- threadFactory 线程工厂
- handler 当线程数超过maximumPoolSize 且workQueue 饱和时,线程池拒绝策略
我们看下,当我们提交任务到线程池中,如何运行的:
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
return new FutureTask<T>(runnable, value);
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 首先判断线程数是否超过corePoolSize,如果没超过,那么会新增线程来处理任务
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
// 线程未满,加入到workQueue中
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
// 再次检查线程池状态,如果不符合要求,拒绝加入任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 队列已经满了,这时候,增加线程数
else if (!addWorker(command, false))
reject(command);
在提交一个任务之后,会将其转换为一个FutureTask
类型,在提交到线程池中去执行。所有关键模块都调用了addWorker
方法,在讲这个之前,先看下ThreadPoolExecutor是怎么控制池中线程状态的,我们需要关注下ThreadPoolExecutor如下代码段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS; // :接受新任务或者处理队列里的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新任务,但仍在处理已经在队列里面的任务。
private static final int STOP = 1 << COUNT_BITS; // 不接受新任务,也不处理队列中的任务,对正在执行的任务进行中断
private static final int TIDYING = 2 << COUNT_BITS; // 所有任务都被中断, workerCount 是 0 ,整理状态
//runState 之间的转变过程:
//RUNNING -> SHUTDOWN :调用 shudown(),finalize()
//(RUNNING or SHUTDOWN) -> STOP :调用 shutdownNow()
//SHUTDOWN -> TIDYING -> workerCount ==0
//STOP -> TIDYING -> workerCount ==0
//TIDYING -> TERMINATED -> terminated() 执行完成之后
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) return c & ~CAPACITY;
private static int workerCountOf(int c) return c & CAPACITY;
private static int ctlOf(int rs, int wc) return rs | wc;
private static boolean runStateLessThan(int c, int s)
return c < s;
private static boolean runStateAtLeast(int c, int s)
return c >= s;
private static boolean isRunning(int c)
return c < SHUTDOWN;
ThreadPoolExecutor中使用了一个AtomicInteger
类型来记录当前线程池中线程相关状态,初始值为:
ctl = -1 << COUNT_BITS
这个值为-536870912
,转换为二进制为:111000000000000000000000000
,一定要注意,负数在计算中是采用补码表示的。
而最大容量表示为:
CAPACITY = (1 << COUNT_BITS) - 1
这个值为536870911
,二进制表示为00011111111111111111111111111111
,因此,这两者在初始的时候,0,1位正好错开。
因此
workerCountOf
函数,正好能够返回当ctl初始化之后的变化新增数据,-isRunning
则是判断ctl的值是否 < 0,由于ctl中初始就是-536870912
,如果容量一直增加,ctl的值为0的时候,也就是达到了最大值。runStateOf
函数,则是c & ~CAPACITY
,只要c < 0,则返回的是 ~CAPACITY,永远是个负数,而当c>=0的时候,返回的是个整数
搞明白了这几个函数的作用,我们接下来看核心的addWorker
逻辑:
private boolean addWorker(Runnable firstTask, boolean core)
retry:
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// 线程池超过容量,或者线程池被SHUTDOWN ,这时候线程池不能在接收任务了
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;)
int wc = workerCountOf(c);
// 线程池工作线程数量已经超过了最大容量或者超过了(corePoolSize (添加核心线程),maximumPoolSize添加非核心线程),这时候也不能添加新线程,返回失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 上面的检查通过,表名可以添加woker,成功则跳出retry处的for循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 继续重试retry的for循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try
// 新建一个 Worker,建立Worker的时候,会通过构造的ThreadFactory新建一个线程与worker绑定
w = new Worker(firstTask);
final Thread t = w.thread;
// 新建Worker顺利的获取到线程
if (t != null)
// 上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
// 防止worker线程已经启动
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
// 启动worker的线程
t.start();
workerStarted = true;
finally
if (! workerStarted)
addWorkerFailed(w);
return workerStarted;
到这里,我们看到addWorker
主要就是根据是否添加核心线程,添加了一个新的,然后启动了worker对应的线程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker(Runnable firstTask)
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
public void run()
runWorker(this);
可以看到,Worker实现了Runnable
接口并继承了AQS
,在构造的时候调用ThreadFactory创建一个线程,并将Worker自己当做一个Runnable传入到了新建的线程中去,这样,新建的线程在执行的时候,调用传入的Worker的run方法,而run方法核心为runWorker
:
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 这一步上来是为了能够响应中断,AQS的state会被设置为0,在构造初始化的时候,AQS被设置为-1
w.unlock();
boolean completedAbruptly = true;
try
// 循环从队里中获取元素,如果getTask返回的是null,则表名需要退出当前线程
while (task != null || (task = getTask()) != null)
w.lock();
// 如果线程池已经停止了,那么停止任务,并且中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
// beforeExecute这里默认是一个空实现,如果想要在任务执行前增加一些逻辑可以实现这个方法
beforeExecute(wt, task);
Throwable thrown = null;
try
// 执行任务
task.run();
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
// 这里默认也是一个空实现,如果想要在任务执行后加一些逻辑,可以实现这个方法
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
//线程退出
processWorkerExit(w, completedAbruptly);
// 从队列中获取任务
private Runnable getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
decrementWorkerCount();
return null;
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 默认为false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当运行线程数量超过maximumPoolSize
// 或者运行线程数量超过corePoolSize且线程空闲时间超过keepAliveTime
// 并且(运行线程数量 > 1或者任务队列为空的时候
// 这时候返回的是null,在调用处,会退出当前线程,当前线程执行完毕。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
if (compareAndDecrementWorkerCount(c))
return null;
continue;
try
// 满足条件,那么会从队列取数据,没有则阻塞等待,通过是否需要超时,来调用不同的阻塞获取数据方法。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
catch (InterruptedException retry)
timedOut = false;
可以看到,线程池,底层是用Worker
去实际执行任务的,每个Worker创建时会通过ThreadFactory生成一个新的线程进行绑定,然后启动workter的thread线程,去任务队里中取数据,当(运行线程数 > maximumPoolSize 或者 运行线程数 > corePoolSize;或者allowCoreThreadTimeOut 允许核心线程超时退出 并且 运行线程数 > 1或者工作队列为空)
这时候返回的是null,上层调用线程会退出,该线程也就被回收了。
而当无法添加任务的时候,则调用reject
方法:
final void reject(Runnable command)
handler.rejectedExecution(command, this);
handler则是在构造ThreadPoolExecutor
指定,有如下几种:
AbortPolicy
直接抛出异常RejectedExecutionException
DiscardPolicy
,直接丢弃,不做任何处理DiscardOldestPolicy
,先将任务队列中poll一个任务,然后提交任务CallerRunsPolicy
不提交任务,而是在调用者线程里执行。
关闭线程
当我们想要关闭线程池的时候,可以调用shutdown
或者shutdownNow
方法。shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。通常使用shutdown
会等待所有任务执行完,而shutdownNow则不会,可能会有任务没有执行完。
而判断线程池是否停止,需要调用isTerminated
,因为调用上面两个方法之后isShutdown
立马返回true
我们看下当线程退出while消费任务干什么了,这个逻辑在processWorkerExit
:
private void processWorkerExit(Worker w, boolean completedAbruptly)
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
completedTaskCount += w.completedTasks;
workers.remove(w);
finally
mainLock.unlock();
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
if (!completedAbruptly)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
以上是关于java同步之线程池ThreadPoolExecutor实现原理的主要内容,如果未能解决你的问题,请参考以下文章
python 之 并发编程(进程池与线程池同步异步阻塞非阻塞线程queue)