并发包之线程池源码解析
Posted sharedCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发包之线程池源码解析相关的知识,希望对你有一定的参考价值。
精品源码
前言
java中使用线程池一般离不开ThreadPoolExecutor , 各种各样的线程池,几乎都由该类 最终完成,所以这里直接介绍这个类
参数详解
corePoolSize: 核心线程池大小
maximumPoolSize : 最大线程数量
keepAliveTime : 超出核心线程池大小的线程过多少时间回收
unit : keepAliveTime的单位
workQueue : 当核心线程池已经满了的并且没有空闲的线程时,任务放入到队列中
threadFactory: 产生线程的工厂,我们可以自定义,比如说,对线程命名,根据一定的规则产生
同时可以设定线程是否是守护线程
RejectedExecutionHandler: 当队列满了,最大线程数也满了,那么就要根据拒绝策略来决定任务的去留
execute
1public void execute(Runnable command) {
2 // 判断传入的任务是否为空
3 if (command == null)
4 throw new NullPointerException();
5 //
6 int c = ctl.get();
7 // 判断运行中的线程数是否小于核心线程数
8 if (workerCountOf(c) < corePoolSize) {
9 // 如果小于,则直接添加任务。
10 if (addWorker(command, true))
11 return;
12 c = ctl.get();
13 }
14 // 当前线程池的状态为running && 队列未满,将任务放入队列成功
15 if (isRunning(c) && workQueue.offer(command)) {
16 // 再次检查 ,dubble check
17 int recheck = ctl.get();
18 // 线程池处于非running状态 && 移除任务成功
19 if (!isRunning(recheck) && remove(command))
20 // 拒绝任务
21 reject(command);
22 // 线程池处于RUNNING状态或者线程池处于非RUNNING状态但是任务移除失败会
23 // 进行这个判断,活动线程的数量==0
24 else if (workerCountOf(recheck) == 0)
25 addWorker(null, false);
26
27
28 // 队列已满,开启额外的线程,也就是最大线程池大小的那个配置,此处
29 // 第二个参数传入的是false,表示不使用核心线程,另外开启线程,只要
30 // 不大于最大maximumPoolSize
31 }else if (!addWorker(command, false))
32 //进入这里表示队列满了,最大线程也满了,因此拒绝任务
33 reject(command);
34 }
addWorker
1private boolean addWorker(Runnable firstTask, boolean core) {
2 retry:
3 for (;;) {
4 int c = ctl.get();
5 //运行状态
6 int rs = runStateOf(c);
7
8 // rs >= SHUTDOWN 表示当前线程池处于要关闭的状态,不再接受新任务了
9 // rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask!=null || workQueue.isEmpty())
10 // 这个是当workQueue不为空,传入的firstTask为空的时候,继续执行下去,因为是在线程池
11 // 即将关闭的情况,继续将剩余的任务处理完
12 if (rs >= SHUTDOWN &&
13 ! (rs == SHUTDOWN &&
14 firstTask == null &&
15 ! workQueue.isEmpty()))
16 return false;
17
18 for (;;) {
19 //获取当前运行的线程的数量
20 int wc = workerCountOf(c);
21 //core表示是否使用核心线程,false表示使用非核心线程
22 if (wc >= CAPACITY ||
23 wc >= (core ? corePoolSize : maximumPoolSize))
24 // 活动线程数量大于核心线程(最大线程),返回false,添加失败
25 return false;
26 // CAS 操作,为workCount + 1
27 if (compareAndIncrementWorkerCount(c))
28 break retry;
29 c = ctl.get(); // 重新比对状态,如果状态发生变更,则继续循环检验
30 if (runStateOf(c) != rs)
31 continue retry;
32 // else CAS failed due to workerCount change; retry inner loop
33 }
34 }
35
36 boolean workerStarted = false;
37 boolean workerAdded = false;
38 Worker w = null;
39 try {
40 // 创建一个worker
41 w = new Worker(firstTask);
42 // 获取worker中的线程
43 final Thread t = w.thread;
44 if (t != null) {
45 // 获取一个锁
46 final ReentrantLock mainLock = this.mainLock;
47 mainLock.lock();
48 try {
49 // Recheck while holding lock.
50 // Back out on ThreadFactory failure or if
51 // shut down before lock acquired.
52 // 获取当前线程池的状态
53 int rs = runStateOf(ctl.get());
54 // 如果线程的状态为running或者SHOWDOWN&&fisrtTask==null
55 // fisrtTask==null表示线程池即将关闭或者已经关闭,需要将剩余
56 // 的任务执行完毕
57 if (rs < SHUTDOWN ||
58 (rs == SHUTDOWN && firstTask == null)) {
59 // 判断线程的运行状态,如果正在运行,则抛异常
60 if (t.isAlive()) // precheck that t is startable
61 throw new IllegalThreadStateException();
62 // 添加worker
63 workers.add(w);
64 int s = workers.size();
65 if (s > largestPoolSize)
66 largestPoolSize = s;
67 // 添加成功
68 workerAdded = true;
69 }
70 } finally {
71 // 解锁
72 mainLock.unlock();
73 }
74 if (workerAdded) {
75 // 启动线程
76 t.start();
77 workerStarted = true;
78 }
79 }
80 } finally {
81 if (! workerStarted)
82 // 添加失败
83 addWorkerFailed(w);
84 }
85 return workerStarted;
86 }
worker
worker是一个继承了AbstractQueuedSynchronizer ,实现了Runnable接口的任务类。
1Worker(Runnable firstTask) {
2 // 设置状态为-1 ,这个主要是使用AQS的CAS操作,来维护状态,主要在关闭线程的时候使用。
3 setState(-1);
4 this.firstTask = firstTask;
5 // 通过线程工厂创建线程
6 this.thread = getThreadFactory().newThread(this);
7}
既然是实现了Runnable的任务类,那么直接看他的run方法。
1public void run() {
2 runWorker(this);
3}
看一下runWorker方法的实现
1final void runWorker(Worker w) {
2 // 获取当前线程
3 Thread wt = Thread.currentThread();
4 // 获取任务
5 Runnable task = w.firstTask;
6 // 释放worker中的任务
7 w.firstTask = null;
8 // 释放state =1 , 在创建worker的时候调用setState(-1)不允许中断
9 // 这里修改为 1 ,表示可以中断
10 w.unlock(); // allow interrupts
11 boolean completedAbruptly = true;
12 try {
13 // 任务不为空,或者 getTask()获取到了任务
14 while (task != null || (task = getTask()) != null) {
15 // 锁
16 w.lock();
17 // If pool is stopping, ensure thread is interrupted;
18 // if not, ensure thread is not interrupted. This
19 // requires a recheck in second case to deal with
20 // shutdownNow race while clearing interrupt
21 //如果线程池停止,则确保线程中断,如果没有,确保线程不中断
22 if ((runStateAtLeast(ctl.get(), STOP) ||
23 (Thread.interrupted() &&
24 runStateAtLeast(ctl.get(), STOP))) &&
25 !wt.isInterrupted())
26 wt.interrupt();
27 try {
28 // 执行之前干点啥事
29 beforeExecute(wt, task);
30 Throwable thrown = null;
31 try {
32 // 开始执行任务
33 task.run();
34 } catch (RuntimeException x) {
35 thrown = x; throw x;
36 } catch (Error x) {
37 thrown = x; throw x;
38 } catch (Throwable x) {
39 thrown = x; throw new Error(x);
40 } finally {
41 // 执行完了,干点啥事
42 afterExecute(task, thrown);
43 }
44 } finally {
45 // 释放任务
46 task = null;
47 w.completedTasks++;
48 // 解锁
49 w.unlock();
50 }
51 }
52 completedAbruptly = false;
53 } finally {
54 // 退出
55 processWorkerExit(w, completedAbruptly);
56 }
57 }
由上面的代码可以看出,如果传入的task不为空,或者getTask不为空,则线程一直运行,当获取不到任务了,则现场自动停止。 首次运行会传入Task, 所以接下来主要看下getTask方法,看下线程是如何获取任务的,获取任务的时候做了什么操作
1private Runnable getTask() {
2 // 是否超时
3 boolean timedOut = false; // Did the last poll() time out?
4 for (;;) {
5 // 线程池
6 int c = ctl.get();
7 // 线程池状态
8 int rs = runStateOf(c);
9
10 // 如果线程池已经关闭,或者队列已经空了,则返回null,并且对workcount-1
11 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
12 decrementWorkerCount();
13 return null;
14 }
15 // 当前运行的线程数量
16 int wc = workerCountOf(c);
17
18 // Are workers subject to culling?
19 // 是否允许核心线程过期 || 当前运行数量>核心线程数
20 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
21 //( 如果当前运行线程>最大线程数 || 可以过期 ) &&
22 // (活动线程数大于1 || 队列为空) , 则返回null
23 // 表示该线程会被销毁
24 if ((wc > maximumPoolSize || (timed && timedOut))
25 && (wc > 1 || workQueue.isEmpty())) {
26 if (compareAndDecrementWorkerCount(c))
27 return null;
28 continue;
29 }
30
31 try {
32 // 获取任务
33 Runnable r = timed ?
34 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
35 workQueue.take();
36 if (r != null)
37 return r;
38 timedOut = true;
39 } catch (InterruptedException retry) {
40 timedOut = false;
41 }
42 }
43 }
上面的代码,前面一部分都是对状态,对是否销毁线程做的一系列判断。 真正核心的代码是这一行。
1 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
timed 表示 当前活动线程数是否大于核心线程数,如果大于,则说明需要销毁线程,因此使用poll从任务队列中获取数据,阻塞 keepAliveTime , 如果没有获取到数据,那么下次运行则会被销毁。
如果timed为false, 则表示当前活动线程数小于等于核心线程数,这个时候就是调用take()方法从队列当中去取任务, 也就是说,核心线程在空闲的时候是处于阻塞状态的。
拒绝策略
AbortPolicy
直接拒绝,报异常
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2 throw new RejectedExecutionException("Task " + r.toString() +
3 " rejected from " +
4 e.toString());
5}
CallerRunsPolicy
直接在execute方法中调用线程的run方法,如果线程池已经关闭,则丢弃该任务
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2 if (!e.isShutdown()) {
3 r.run();
4 }
5}
DiscardOldestPolicy
丢弃最早的那个任务,也就是队列中的第一个任务,然后重新调用execute方法
1public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2 if (!e.isShutdown()) {
3 e.getQueue().poll();
4 e.execute(r);
5 }
6}
DiscardPolicy
直接丢弃,不做任何处理
1 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
线程池类型
以ThreadPoolExecutor 为基础,可以通过Executors创建各种类型的线程, 此处列出常见的四中线程池
固定大小线程池
1 public static ExecutorService newFixedThreadPool(int nThreads) {
2 return new ThreadPoolExecutor(nThreads, nThreads,
3 0L, TimeUnit.MILLISECONDS,
4 new LinkedBlockingQueue<Runnable>());
5 }
核心线程数和最大线程数一致,空闲时间为0秒, 这就说明线程池中的线程数量是固定的
长度为1的线程池
1public static ExecutorService newSingleThreadExecutor() {
2 return new FinalizableDelegatedExecutorService
3 (new ThreadPoolExecutor(1, 1,
4 0L, TimeUnit.MILLISECONDS,
5 new LinkedBlockingQueue<Runnable>()));
6 }
核心线程数和最大线程数都是1
定时线程池
1public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2 return new ScheduledThreadPoolExecutor(corePoolSize);
3}
4public ScheduledThreadPoolExecutor(int corePoolSize) {
5 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
6 new DelayedWorkQueue());
7}
定时器线程,通过参数确定核心线程数,最大线程数去int的最大值, 这里最重要的一点,就是任务队列使用了
DelayWorkQueue() , 这个队列里面的任务,可以知道过期时间,只有到了时间才会被取出来执行,由此做到定时的概念。
缓存线程池
1 public static ExecutorService newCachedThreadPool() {
2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3 60L, TimeUnit.SECONDS,
4 new SynchronousQueue<Runnable>());
5}
核心线程数为0 ,最大线程数为int的最大值,缓存时间为60秒,说明当线程池里面的线程没有任务来处理的时候,会再60秒之后进行销毁。
任务队列使用的是SynchronousQueue,这是一个没有长度的队列,每一个put操作都要等待take操作。
因此使用执行大量短生命周期的异步任务,可以显著提高性能。
注意点: 由于这是一个无定长的线程,并且允许线程空闲60秒, 在使用的时候要注意线程的创建,如果创建了大量的线程,有可能会造成系统崩溃。
以上是关于并发包之线程池源码解析的主要内容,如果未能解决你的问题,请参考以下文章