从源码看JDK提供的线程池(ThreadPoolExecutor)

Posted MindMrWang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从源码看JDK提供的线程池(ThreadPoolExecutor)相关的知识,希望对你有一定的参考价值。

一丶什么是线程池

(1)博主在听到线程池三个字的时候第一个想法就是数据库连接池,回忆一下,我们在学JavaWeb的时候怎么理解数据库连接池的,数据库创建连接和关闭连接是一个比较耗费资源的事情,对于那些数量多且时间短暂的任务,会导致频繁获取和释放连接,这样使得处理事务的效率大大降低,多以我们创建一个连接池,里面放了指定数目的连接,当应用需要数据库连接的时候去里面获取,使用完毕后再放到连接池里,这样就避免了重复的获取连接和释放连接,至于要获取什么样的连接池我们可以根据应用的特征,设置参数来决定。
(2)线程池和连接池很相似,线程池的产生是为了避免重复的创建线程和回收线程。本着存在即合理,存在即有优点的理念(这个说法不普遍适用),线程池有如下三个优点:

①降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗。
②提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
③提高线程的可管理性。线程是稀缺资源,如果入限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。


二丶ThreadPoolExecutor的使用

ThreadPoolExecutor是线程池的最核心的一个类,所以要了解线程池我们先来看看ThreadPoolExecutor类的实现。
本着先学开车后学修车的理念,我们先通过范例来学习一下ThreadPoolExecutor的使用(以后对JDK源码框架的学习都会本着这个原则)。


public class ThreadPoolExecutorTest {
	public static void main(String[] args) {
		ThreadPoolExecutorTest task = new ThreadPoolExecutorTest();
		//corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue
		ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 20, 300, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4));
		for(int i=0;i < 15;i++){
			threadPool.execute(task.new MyTask(i));
		}
		threadPool.shutdown();
		System.out.println("end");
	}
	
	    public class MyTask implements Runnable {
		private int taskNo;
		public MyTask(int taskNO){
			this.taskNo = taskNO;
		}
		public void run(){
			System.out.println("任务:"+taskNo+"正在执行");
			try {
				Thread.currentThread().sleep(4000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("任务:"+taskNo+"执行执行结束");
		}
	}
}

输出:

任务:3正在执行
任务:10正在执行
任务:1正在执行
任务:9正在执行
任务:0正在执行
任务:4正在执行
任务:2正在执行
任务:11正在执行
任务:13正在执行
任务:12正在执行
任务:14正在执行
end
任务:1执行执行结束
任务:5正在执行
任务:9执行执行结束
任务:6正在执行
任务:11执行执行结束
任务:7正在执行
任务:3执行执行结束
任务:8正在执行
任务:10执行执行结束
任务:12执行执行结束
任务:2执行执行结束
任务:13执行执行结束
任务:14执行执行结束
任务:0执行执行结束
任务:4执行执行结束
任务:6执行执行结束
任务:5执行执行结束
任务:8执行执行结束
任务:7执行执行结束

以上的范例就是ThreadPoolExecutor的简单应用,首先需要创建一个任务类MyTask ,其次在主方法里创建ThreadPoolExecutor对象,接着用for循环来模拟运行多个线程,然后execute方法执行,最后调用shutdown方法结束。
上面代码的实现和我们往常实现多线程有些区别,我们往常使用:

Thread threadA = new Thread();
thread.start();

来创建一个线程执行任务,在应用ThreadPoolExecutor时,我们不再自己创建,而是使用线程池为我们创建的线程。

在创建线程池ThreadPoolExecutor对象时,有很多个构造参数,通过注释我们可以了解到,这些参数就是用来设置线程池的特征的。


三丶从源码来看ThreadPoolExecutor

1)ThreadPoolExecutor结构:

public class ThreadPoolExecutor extends AbstractExecutorService {
	...
}

public abstract class AbstractExecutorService implements ExecutorService {
	...
}

public interface ExecutorService extends Executor{
	...
}

public interface Executor {
	...
}
//ForkJoinPool 也继承自AbstractExecutorService 
public class ForkJoinPool extends AbstractExecutorService {
	...
}

2)线程池处理任务处流程:
知道了ThreadPoolExecutor的继承关系后我们来了解一下ThreadPoolExecutor的设计结构和思想,这对我们后面理解ThreadPoolExecutor的源码有很大的帮助:
这里写图片描述
(对队列的判断应该在线程池内部)

上面的这张图应该就能大概的描述ThreadPoolExecutor的实现了,同时也能够理解ThreadPoolExecutor的构造参数了。

线程池在创建的时候会设置CorePoolSize,maximumPoolSize,workQueue等几个重要参数,CorePoolSize指的是核心线程池的大小,maximumPoolSize指的是线程池的线程数最大值,workQueue为线程池指定的阻塞队列。
处理流程:

  1. 主线程执行execute方法,提交任务到线程池,线程池判断核心线程池中的线程是否都在工作,如果不是则创建一个线程来执行新任务,如果都在工作,进入下一步。
  2. 判断工作队列是否已满,如果不满,则将新任务加入到阻塞队列中,如果满了进入下一步。
  3. 判断线程池中线程数是否小于maximumPoolSize,如果小于,创建新的线程来处理新任务,否则交给饱和策略。

3)源码:
接下来我们跟着方法的执行流程来跟源码:
源码从哪里开始跟?当然是从execute方法开始啦,毕竟这个是执行的开端呀(博主跟源码还是喜欢这样,这样慢慢跟下去,不太喜欢直接看构造,字段,方法,等看到了字段属性再跟下去看)。


3.1 execute:

public void execute(Runnable command) {
		//判断任务有效性
        if (command == null)
            throw new NullPointerException();
        //ctl是一个AtomicInteger类型数据
        //private final AtomicInteger ctl = 
        //new AtomicInteger(ctlOf(RUNNING, 0));
        //ctlOf方法下面是Runing代表的值和0的或操作
        //private static int ctlOf(int rs, int wc)
        // { return rs | wc; }
        //private static final int RUNNING = -1 << COUNT_BITS;
        //COUNT_BITS = Integer.SIZE - 3;
        //所以这个c就是RUNNING值的句柄,额,大动干戈了...
        int c = ctl.get();
        //如果运行的线程数小于corePoolSize
        if (workerCountOf(c) < corePoolSize) {
	        //如果线程池成功为command任务创建或分配新的线程
	        //addWorker方法boolean参数用来判断是否在核心池加任务
            if (addWorker(command, true))
                //退出程序
                return;
            //更新Runing值
            c = ctl.get();
        }
        //private static boolean isRunning(int c) 
        //{return c < SHUTDOWN;}
        //SHUTDOWN值为0,如果小于这个值,表示运行停止
        //offer用来判断任务是否成功入队
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取RUNNING值
            int recheck = ctl.get();
	        //如果command在任务队列中,remove方法将其移除
            if (! isRunning(recheck) && remove(command))
                //将command任务交给饱和策略
                reject(command);
            //如果程序遭到shutdown或shutdownNow方法停止,
            //那么这时会检测到无线程运行,这个时候不要添加任务处理
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果入队失败,那么交给饱和策略
        else if (!addWorker(command, false))
            reject(command);
    }

相信这个方法已经不要我再多作什么赘述了吧(捂嘴笑.jpg)。
这里我们再将ThreadPoolExecutor的一些字段列一下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 这几个方法我也将它们看做字段了
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    //总之,为了效率,源码的位运算常见的跟喝茶似的

如果没有对照源码看博文的小伙伴可能会有很多疑问,相信你打开源码再看我的方法注释会更好的理解!


3.2 addWorker:
看了上面的execute方法,相信我们可以看出来addWorker方法也算是核心了,addWorker方法担任了所有的将任务交给线程的操作:

   //addWorker方法两个参数,第一个参数不用说,第二个之前我们说过
   //它是用来区分任务是将送达的地方(是否是核心线程池)
   private boolean addWorker(Runnable firstTask, boolean core) {
		/*说明:其实retry就是一个标记,标记程序跳出循环的时候从哪里开始执行,
		*功能类似于goto。retry一般都是跟随者for循环出现,第一个retry的下面
		*一行就是for循环,而且第二个retry的前面一般是 continue或是 break。*/
        retry:
        for (;;) {
	        //获取RUNNING
            int c = ctl.get();
            //运行状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //下面可以看出Boolean类型参数core 作用了
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //这里有个疑问,如果上面一直死循环,即使通过break跳出循环,那么根据
        //Retry特性,岂不是还要执行死循环,然后往复循环?
		//哦,知道了,当不满足上面的各种条件的时候
		//不就不用执行上面的代码了吗,这个...思维僵化了...

        boolean workerStarted = false;
        boolean workerAdded = false;
        
        Worker w = null;
        try {
	        //从这里我们可以看出对线程的包装
            w = new Worker(firstTask);
            //这里创建了线程
            final Thread t = w.thread;
            //有效性判断
            if (t != null) {
	            //这里要加锁了
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //workers为HashSet
                        //是用来存放被包装过的工作线程
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //当添加成功后就要启动了
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


从上面的代码我们可以发现,当任务交给线程执行的时候并不是直接的交给线程,线程池创建线程后会将线程封装成工作线程Worker,Worker工作完后还会继续去工作队列中获取任务来执行。


3.3 Worker类:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable{
	...
}

我们可以从下面这段Worker类的Run方法中窥到这一点:
jdk1.8和1.7有很大的改动,有兴趣的朋友可以去对比一下

 public void run() {
            runWorker(this);
        }
   final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
	        //下面这行是重点
	        //task不为空或者getTask(获取队列中任务)不为空的时候
	        //对这个任务加锁进行处理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
	                    //这里运行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

3.4 submit:
既然讲到了execute方法,怎么能少了submit方法,当我们执行一个任务的时候,有的时候需要返回值,这个时候我们就需要用到submit方法了。
其实我们通过源码可以发现submit方法内部也是调用execute方法,当调用submit方法的时候我们可以收到一个Future对象,我们可以调用Future对象的get方法来获得它的返回值。
关于Future的知识,可以参考:
https://www.cnblogs.com/cz123/p/7693064.html
注:我在ThreadPoolExecutor中找submit方法的时候没找到,然后才发现ThreadPoolExecutor是直接继承他的父类AbstractExecutorService的。

 public Future<?> submit(Runnable task) {
        //验证任务有效性
        if (task == null) throw new NullPointerException();
        
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

从上面的代码我们可以看出将任务包装成一个RunnableFuture对象,然后将这个对象用execute执行。
如果看过我上面推荐的博文,读者应该知道Callable和和Runnable的区别了:是否有返回值。

关于这个Future的结构我们来理一下:
FutureTask→(实现)RunnableFuture→(继承)Runnable,Future

我们可以看看newTaskFor方法:

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

它的底层其实就是new一个FutureTask对象而已,所以FutureTask才是真正的实现类哦!
关于FutureTask的实现,我们会专门抽出时间去整理!


3.5shutdown和shutdownNow:
我们可以通过shutdown和shutdownNow方法来关闭线程池,shutdown方法通过遍历工作线程HashSet,将运行状态(ctl)这设置为SHUTDOWN并调用interrupt方法中断所有线程,shutdownNow同样遍历所有线程,将将运行状态(ctl)这设置为STOP,并调用interrupt方法中断所有线程。

   public void shutdown() {
		//需要加锁中断
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	        //保证线程可中断
            checkShutdownAccess();
            //更改运行状态,底层为原子操作
            advanceRunState(SHUTDOWN);
            //这个方法会对全局变量workers(HashSet)进行遍历
            //对这个里的所有工作线程调用interrupt方法
            interruptIdleWorkers();
            //一个空方法
            //官方是这样说的:
            //used by ScheduledThreadPoolExecutor
            //to cancel delayed tasks.
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
	        //保证线程可中断
            checkShutdownAccess();
            //ctl设置为STOP
            advanceRunState(STOP);
            interruptWorkers();
            //这个就是两个方法区别
            //这个方法将工作队列中的任务(还未执行)
            //取出放到list中
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

3.6 RejectedExecutionHandler:
最后我们来看看这个饱和策略,当线程和队列都满了过后,表明这个线程池处于饱和的状态,那么我们必须要采取一定的措施来处理这些任务,在默认的情况下我们会执行AbortPolicy,表示无法处理任务,抛出异常。
同时JDK提供了以下的几种策略:

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:用调用者的线程来执行任务
  • DiscardOldestPolicy:丢弃队列里最近的任务,并执行这个任务
  • DiscardPolicy:丢弃,不处理

我们也可以实现RejectedExecutionHandler接口进行自定义操作,例如有的时候我们需要将这种异常记录到日志当中,这个时候我们就需要自定义了!

总结:通过对线程池的学习,自己又了解到不少自己不知道的知识,例如Future接口等,也通过学习,发现一些容器和锁真的很常见,自己平时应用的时候很少碰见,当然,大师的编程水平也值得我们这些小民学习!

以上是关于从源码看JDK提供的线程池(ThreadPoolExecutor)的主要内容,如果未能解决你的问题,请参考以下文章

Java基础之-ExecutorService(线程池)

高并发通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程

高并发通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程

怎么才算掌握了JDK中的线程池

一文读懂JDK源码:ThreadPoolExecutor

Java线程池源码阅读