Java并发系列[10]----ThreadPoolExecutor源码分析

Posted 劳夫子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发系列[10]----ThreadPoolExecutor源码分析相关的知识,希望对你有一定的参考价值。

在日常的开发调试中,我们经常会直接new一个Thread对象来执行某个任务。这种方式在任务数较少的情况下比较简单实用,但是在并发量较大的场景中却有着致命的缺陷。例如在访问量巨大的网站中,如果每个请求都开启一个线程来处理的话,即使是再强大的服务器也支撑不住。一台电脑的CPU资源是有限的,在CPU较为空闲的情况下,新增线程可以提高CPU的利用率,达到提升性能的效果。但是在CPU满载运行的情况下,再继续增加线程不仅不能提升性能,反而因为线程的竞争加大而导致性能下降,甚至导致服务器宕机。因此,在这种情况下我们可以利用线程池来使线程数保持在合理的范围内,使得CPU资源被充分的利用,且避免因过载而导致宕机的危险。在Executors中为我们提供了多种静态工厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor对象。因此本篇我们从ThreadPoolExecutor类着手,深入探究线程池的实现机制。

1. 线程池状态和线程数的表示

 1 //高3位表示线程池状态, 后29位表示线程个数
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 private static final int COUNT_BITS = Integer.SIZE - 3;
 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 5 
 6 //运行状态  例:11100000000000000000000000000000
 7 private static final int RUNNING = -1 << COUNT_BITS;
 8 
 9 //关闭状态  例:00000000000000000000000000000000
10 private static final int SHUTDOWN = 0 << COUNT_BITS;
11 
12 //停止状态  例:00100000000000000000000000000000
13 private static final int STOP = 1 << COUNT_BITS;
14 
15 //整理状态  例:01000000000000000000000000000000
16 private static final int TIDYING = 2 << COUNT_BITS;
17 
18 //终止状态  例:01100000000000000000000000000000
19 private static final int TERMINATED = 3 << COUNT_BITS;
20 
21 private static int runStateOf(int c) { return c & ~CAPACITY; }
22 private static int workerCountOf(int c) { return c & CAPACITY; }
23 private static int ctlOf(int rs, int wc) { return rs | wc; }

在继续接下来的探究之前,我们先来搞清楚ThreadPoolExecutor是怎样存放状态信息和线程数信息的。ThreadPoolExecutor利用原子变量ctl来同时存储运行状态和线程数的信息,其中高3位表示线程池的运行状态(runState),后面的29位表示线程池中的线程数(workerCount)。上面代码中,runStateOf方法是从ctl取出状态信息,workerCountOf方法是从ctl取出线程数信息,ctlOf方法是将状态信息和线程数信息糅合进ctl中。具体的计算过程如下图所示。

2. 线程池各个状态的具体含义

就像人的生老病死一样,线程池也有自己的生命周期,从创建到终止,线程池在每个阶段所做的事情是不一样的。新建一个线程池时它的状态为Running,这时它不断的从外部接收并处理任务,当处理不过来时它会把任务放到任务队列中;之后我们可能会调用shutdown()来终止线程池,这时线程池的状态从Running转为Shutdown,它开始拒绝接收从外部传过来的任务,但是会继续处理完任务队列中的任务;我们也可能调用shutdownNow()来立刻停止线程池,这时线程池的状态从Running转为Stop,然后它会快速排空任务队列中的任务并转到Tidying状态,处于该状态的线程池需要执行terminated()来做相关的扫尾工作,执行完terminated()之后线程池就转为Terminated状态,表示线程池已终止。这些状态的转换图如下所示。

3. 关键成员变量的介绍

 1 //任务队列
 2 private final BlockingQueue<Runnable> workQueue;
 3 
 4 //工作者集合
 5 private final HashSet<Worker> workers = new HashSet<Worker>();
 6 
 7 //线程达到的最大值
 8 private int largestPoolSize;
 9 
10 //已完成任务总数
11 private long completedTaskCount;
12 
13 //线程工厂
14 private volatile ThreadFactory threadFactory;
15 
16 //拒绝策略
17 private volatile RejectedExecutionHandler handler;
18 
19 //闲置线程存活时间
20 private volatile long keepAliveTime;
21 
22 //是否允许核心线程超时
23 private volatile boolean allowCoreThreadTimeOut;
24 
25 //核心线程数量
26 private volatile int corePoolSize;
27 
28 //最大线程数量
29 private volatile int maximumPoolSize;
30 
31 //默认拒绝策略
32 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

在深入探究线程池的实现机制之前,我们有必要了解一下各个成员变量的作用。上面列出了主要的成员变量,除了一些用于统计的变量,例如largestPoolSize和completedTaskCount,其中大部分变量的值都是可以在构造时进行设置的。下面我们就看一下它的核心构造器。

 1 //核心构造器
 2 public ThreadPoolExecutor(int corePoolSize,
 3                           int maximumPoolSize,
 4                           long keepAliveTime,
 5                           TimeUnit unit,
 6                           BlockingQueue<Runnable> workQueue,
 7                           ThreadFactory threadFactory,
 8                           RejectedExecutionHandler handler) {
 9     if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
10         throw new IllegalArgumentException();
11     }    
12     if (workQueue == null || threadFactory == null || handler == null) {
13         throw new NullPointerException();
14     }
15     this.corePoolSize = corePoolSize;                  //设置核心线程数量
16     this.maximumPoolSize = maximumPoolSize;            //设置最大线程数量
17     this.workQueue = workQueue;                        //设置任务队列
18     this.keepAliveTime = unit.toNanos(keepAliveTime);  //设置存活时间
19     this.threadFactory = threadFactory;                //设置线程工厂
20     this.handler = handler;                            //设置拒绝策略
21 }

ThreadPoolExecutor有多个构造器,所有的构造器都会调用上面的核心构造器。通过核心构造器我们可以为线程池设置不同的参数,由此线程池也能表现出不同的特性。因此彻底搞懂这几个参数的含义能使我们更好的使用线程池,下面我们就来详细看一下这几个参数的含义。
corePoolSize:
核心线程数最大值,默认情况下新建线程池时并不创建线程,后续每接收一个任务就新建一个核心线程来处理,直到核心线程数达到corePoolSize。这时后面到来的任务都会被放到任务队列中等待。
maximumPoolSize:
总线程数最大值,当任务队列被放满了之后,将会新建非核心线程来处理后面到来的任务。当总的线程数达到maximumPoolSize后,将不再继续创建线程,而是对后面的任务执行拒绝策略。
workQueue:
任务队列,当核心线程数达到corePoolSize后,后面到来的任务都会被放到任务队列中,该任务队列是阻塞队列,工作线程可以通过定时或者阻塞方式从任务队列中获取任务。
keepAliveTime:
闲置线程存活时间,该参数默认情况下只在线程数大于corePoolSize时起作用,闲置线程在任务队列上等待keepAliveTime时间后将会被终止,直到线程数减至corePoolSize。也可以通过设置allowCoreThreadTimeOut变量为true来使得keepAliveTime在任何时候都起作用,这时线程数最后会减至0。

4. execute方法的执行过程

 1 //核心执行方法
 2 public void execute(Runnable command) {
 3     if (command == null) throw new NullPointerException();
 4     int c = ctl.get();
 5     //线程数若小于corePoolSize则新建核心工作者
 6     if (workerCountOf(c) < corePoolSize) {
 7         if (addWorker(command, true)) return;
 8         c = ctl.get();
 9     }
10     //否则将任务放到任务队列
11     if (isRunning(c) && workQueue.offer(command)) {
12         int recheck = ctl.get();
13         //若不是running状态则将该任务从队列中移除
14         if (!isRunning(recheck) && remove(command)) {
15             //成功移除后再执行拒绝策略
16           reject(command);
17         //若线程数为0则新增一个非核心线程
18         }else if (workerCountOf(recheck) == 0) {
19           addWorker(null, false);
20         }
21     //若队列已满则新增非核心工作者
22     }else if (!addWorker(command, false)) {
23         //若新建非核心线程失败则执行拒绝策略
24       reject(command);
25     }
26 }

execute方法是线程池接收任务的入口方法,当创建好一个线程池之后,我们会调用execute方法并传入一个Runnable交给线程池去执行。从上面代码中可以看到execute方法首先会去判断当前线程数是否小于corePoolSize,如果小于则调用addWorker方法新建一个核心线程去处理该任务,否则调用workQueue的offer方法将该任务放入到任务队列中。通过offer方法添加并不会阻塞线程,如果添加成功会返回true,若队列已满则返回false。在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。如果任务队列已经满了,此时offer方法会返回false,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果期间创建线程失败,则最后会执行拒绝策略。

5. 工作线程的实现

 1 //工作者类
 2 private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
 3     //关联线程
 4     final Thread thread;
 5     //初始任务
 6     Runnable firstTask;
 7     //完成任务数
 8     volatile long completedTasks;
 9 
10     //构造器
11     Worker(Runnable firstTask) {
12         //抑制中断直到runWorker
13         setState(-1);
14         //设置初始任务
15         this.firstTask = firstTask;
16         //设置关联线程
17         this.thread = getThreadFactory().newThread(this);
18     }
19     
20     public void run() {
21         runWorker(this);
22     }
23     
24     //判断是否占有锁, 0代表未占用, 1代表已占用
25     protected boolean isHeldExclusively() {
26         return getState() != 0;
27     }
28 
29     //尝试获取锁
30     protected boolean tryAcquire(int unused) {
31         if (compareAndSetState(0, 1)) {
32             setExclusiveOwnerThread(Thread.currentThread());
33             return true;
34         }
35         return false;
36     }
37     
38     //尝试释放锁
39     protected boolean tryRelease(int unused) {
40         setExclusiveOwnerThread(null);
41         setState(0);
42         return true;
43     }
44 
45     public void lock() { acquire(1); }
46     public boolean tryLock() { return tryAcquire(1); }
47     public void unlock() { release(1); }
48     public boolean isLocked() { return isHeldExclusively(); }
49 
50     //中断关联线程
51     void interruptIfStarted() {
52         Thread t;
53         //将活动线程和闲置线程都中断
54         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
55             try {
56                 t.interrupt();
57             } catch (SecurityException ignore) {
58                 //ignore
59             }
60         }
61     }
62 }

ThreadPoolExecutor内部实现了一个Worker类,用它来表示工作线程。每个Worker对象都持有一个关联线程和分配给它的初始任务。Worker类继承自AQS并实现了自己的加锁解锁方法,说明每个Worker对象也是一个锁对象。同时Worker类还实现了Runnable接口,因此每个Worker对象都是可以运行的。Worker类有一个唯一的构造器,需要传入一个初始任务给它,在构造器里面首先将同步状态设置为-1,这个操作主要是抑制中断直到runWorker方法运行,为啥要这样做?我们继续看下去,可以看到在设置完初始任务之后,马上就开始设置关联线程,关联线程是通过线程工厂的newThread方法来生成的,这时将Worker对象本身当作任务传给关联线程。因此在启动关联线程时(调用start方法),会运行Worker对象自身的run方法。而run方法里面紧接着调用runWorker方法,也就是说只有在runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义,因此前面要通过设置同步状态为-1来抑制中断。那么为啥将同步状态设置为-1就可以抑制中断?每个Worker对象都是通过调用interruptIfStarted方法来中断关联线程的,在interruptIfStarted方法内部会判断只有同步状态>=0时才会中断关联线程。因此将同步状态设置为-1能起到抑制中断的作用。

6. 工作线程的创建

 1 //添加工作线程
 2 private boolean addWorker(Runnable firstTask, boolean core) {
 3     retry:
 4     for (;;) {
 5         int c = ctl.get();
 6         int rs = runStateOf(c);
 7         //只有以下两种情况会继续添加线程
 8         //1.状态为running
 9         //2.状态为shutdown,首要任务为空,但任务队列中还有任务
10         if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
11             return false;
12         }
13         for (;;) {
14             int wc = workerCountOf(c);
15             //以下三种情况不继续添加线程:
16             //1.线程数大于线程池总容量
17             //2.当前线程为核心线程,且核心线程数达到corePoolSize
18             //3.当前线程非核心线程,且总线程达到maximumPoolSize
19             if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
20                 return false;
21             }
22             //否则继续添加线程, 先将线程数加一
23             if (compareAndIncrementWorkerCount(c)) {
24                 //执行成功则跳过外循环
25                 break retry;
26             }
27             //CAS操作失败再次检查线程池状态
28             c = ctl.get();
29             //如果线程池状态改变则继续执行外循环
30             if (runStateOf(c) != rs) {
31                 continue retry;
32             }
33             //否则表明CAS操作失败是workerCount改变, 继续执行内循环
34         }
35     }
36     boolean workerStarted = false;
37     boolean workerAdded = false;
38     Worker w = null;
39     try {
40         final ReentrantLock mainLock = this.mainLock;
41         w = new Worker(firstTask);
42         final Thread t = w.thread;
43         if (t != null) {
44             mainLock.lock();
45             try {
46                 int c = ctl.get();
47                 int rs = runStateOf(c);
48                 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
49                     //如果线程已经开启则抛出异常
50                     if (t.isAlive()) throw new IllegalThreadStateException();
51                     //将工作者添加到集合中
52                     workers.add(w);
53                     int s = workers.size();
54                     //记录线程达到的最大值
55                     if (s > largestPoolSize) {
56                         largestPoolSize = s;
57                     }
58                     workerAdded = true;
59                 }
60             } finally {
61                 mainLock.unlock();
62             }
63             //将工作者添加到集合后则启动线程
64             if (workerAdded) {
65                 t.start();
66                 workerStarted = true;
67             }
68         }
69     } finally {
70         //如果线程启动失败则回滚操作
71         if (!workerStarted) {
72             addWorkerFailed(w);
73         }
74     }
75     return workerStarted;
76 }

上面我们知道在execute方法里面会调用addWorker方法来添加工作线程。通过代码可以看到进入addWorker方法里面会有两层自旋循环,在外层循环中获取线程池当前的状态,如果线程池状态不符合就直接return,在内层循环中获取线程数,如果线程数超过限定值也直接return。只有经过这两重判断之后才会使用CAS方式来将线程数加1。成功将线程数加1之后就跳出外层循环去执行后面的逻辑,否则就根据不同条件来进行自旋,如果是线程池状态改变就执行外层循环,如果是线程数改变就执行内层循环。当线程数成功加1之后,后面就是去新建一个Worker对象,并在构造时传入初始任务给它。然后将这个Worker对象添加到工作者集合当中,添加成功后就调用start方法来启动关联线程。

7. 工作线程的执行

 1 //运行工作者
 2 final void runWorker(Worker w) {
 3     //获取当前工作线程
 4     Thread wt = Thread.currentThread();
 5     //获取工作者的初始任务
 6     Runnable task = w.firstTask;
 7     //将工作者的初始任务置空
 8     w.firstTask = null;
 9     //将同步状态从-1设为0
10     w.unlock();
11     boolean completedAbruptly = true;
12     try {
13         //初始任务不为空则执行初始任务, 否则从队列获取任务
14         while (task != null || (task = getTask()) != null) {
15             //确保获取到任务后才加锁
16             w.lock(); 
17             //若状态大于等于stop, 保证当前线程被中断
18             //若状态小于stop, 保证当前线程未被中断
19             //在清理中断状态时可能有其他线程在修改, 所以会再检查一次
20             if ((runStateAtLeast(ctl.get(), STOP) || 
21                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
22                 wt.interrupt();
23             }
24             try {
25                 //任务执行前做些事情
26                 beforeExecute(wt, task);
27                 Throwable thrown = null;
28                 try {
29                     //执行当前任务
30                     task.run();
31                 } catch (RuntimeException x) {
32                     thrown = x; throw x;
33                 } catch (Error x) {
34                     thrown = x; throw x;
35                 } catch (Throwable x) {
36                     thrown = x; throw new Error(x);
37                 } finally {
38                     //任务执行后做一些事情
39                     afterExecute(task, thrown);
40                 }
41             } finally {
42                 //将执行完的任务置空
43                 task = null;
44                 //将完成的任务数加一
45                 w.completedTasks++;
46                 w.unlock();
47             }
48         }
49         //设置该线程为正常完成任务
50         completedAbruptly = false;
51     } finally {
52         //执行完所有任务后将线程删除
53         processWorkerExit(w, completedAbruptly);
54     }
55 }

上面我们知道,将Worker对象添加到workers集合之后就会去调用关联线程的start方法,由于传给关联线程的Runnable就是Worker对象本身,因此会调用Worker对象实现的run方法,最后会调用到runWorker方法。我们看到上面代码,进入到runWorker方法里面首先获取了Worker对象的初始任务,然后调用unlock方法将同步状态加1,由于在构造Worker对象时将同步状态置为-1了,所以这里同步状态变回0,因此在这之后才可以调用interruptIfStarted方法来中断关联线程。如果初始任务不为空就先去执行初始任务,否则就调用getTask方法去任务队列中获取任务,可以看到这里是一个while循环,也就是说工作线程在执行完自己的任务之后会不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。如果需要在所有任务执行之前或之后做些处理,可以分别实现beforeExecute方法和afterExecute方法。

8. 任务的获取

 1 //从任务队列中获取任务
 2 private Runnable getTask() {
 3     //上一次获取任务是否超时
 4     boolean timedOut = false;
 5     retry:
 6     //在for循环里自旋
 7     for (;;) {
 8         int c = ctl.get();
 9         int rs = runStateOf(c);
10         //以下两种情况会将工作者数减为0并返回null,并直接使该线程终止:
11         //1.状态为shutdown并且任务队列为空
12         //2.状态为stop, tidying或terminated
13         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
14             decrementWorkerCount();
15             return null死磕 java线程系列之线程池深入解析——构造方法

『图解Java并发编程系列』10张图告诉你Java并发多线程那些破事

Java并发系列[10]----ThreadPoolExecutor源码分析

并发系列1-- JAVA并发基础

并发系列1-- JAVA并发基础

并发系列1-- JAVA并发基础