Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析

Posted 黄智霖-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析相关的知识,希望对你有一定的参考价值。

目录

前言

  JUC中包含了很多的工具类,不论是第三方工具包,还是自己开发的业务系统,使用最多的还是是线程池,比如普通的线程池ThreadPoolExecutor、带调度的线程池ScheduledThreadPoolExecutor等,本文就从源码层面探究一下ThreadPoolExecutor的实现原理~
  另外,本文涉及的ReentrantLock和条件队列,请参考:ReentrantLock源码深度分析CyclicBarrier源码深度分析

ThreadPoolExecutor总结

构造参数

  • int corePoolSize:"核心"线程数,提交任务时如果当前工作线程数小于核心线程数,会直接创建新的线程执行任务
  • int maximumPoolSize:线程池能容纳的最大线程数,提交任务时如果工作线程数达到了核心线程数并且队列已满,但是线程数小于max,则会创建新的线程执行任务。
  • long keepAliveTime:非核心线程最长空闲时间,如果非核心线程超过空闲时间没有执行任务,则会结束。allowCoreThreadTimeOut参数可控制是否其对核心线程生效
  • TimeUnit unit:keepAliveTime的单位,比如秒、毫秒等
  • BlockingQueue workQueue:阻塞队列,添加任务时,如果已创建线程数达到了核心线程数,任务会被尝试放到该队列
  • ThreadFactory threadFactory:线程工厂,线程池中的线程都通过该工厂创建
  • RejectExecutionHandler handler:拒绝策略,线程池无法接收任务后,继续添加任务触发的行为

注:下图是一个简单总结,没有包含所有情况

注:所谓的核心线程,并没有什么标识字段去区分线程是否是核心,线程就是线程,在线程池中都是一样的,区别就是线程创建的时机。可以这样理解,线程池有个定义的初始线程数量(corePoolSize),这些线程并不是线程池初始化的时候就创建好的,而是基于饱汉模式,提交任务的时候才创建线程,当线程达到指定数量(corePoolSize)后,继续添加的任务需要有个队列(BlockingQueue)存储下来,一个线程完成一个任务后可以从这个队列中获取任务继续执行,而如果这个队列也存储满了呢?这里提供了一种伸缩机制,如果线程数量达到了corePoolSize上限,并且队列也满了,那么允许再创建一些线程,这些线程就像公司的临时工一样(之前的线程就是正式员工),临时工可以在公司超负荷的时候过来帮帮忙,当然临时工的数量也要有个上限,那么正式员工+临时工的最大数量就是maximumPoolSize,既然是临时工,那么在公司没有那么忙的时候就可以解除雇佣了,所以通过keepAliveTime参数来进行控制,如果一个临时工的空闲时间达到了这个阈值,那么就认为不再需要它了,就将其解雇,如果后期公司又超负荷了,就继续招聘临时工。当然,如果公司领导为了节约成本,当公司没有那么忙的时候,连正式员工也想一起解雇,那么可以通过设置allowCoreThreadTimeOut参数为true达到目的。
再次强调,线程本身没有核心属性,这只是概念上的说法,具体到代码中都是根据线程数量和队列状态来确定的,很可能这个名词都是根据Doug Lean对变量的命名翻译得来~

线程池状态

  在线程池中存在定义了5种状态:

	// runState is stored in the high-order bits
	//运行状态,能够接收新的任务,并且线程池中的线程正在处理已添加的任务(默认状态)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //关闭状态,不能接收新的任务,但是能处理已经添加的任务(shutdown()方法)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //停止状态,不能接收新的任务,也不会继续处理已经添加的任务,会中断(interrupt)正在执行的任务(shutdownNow()方法)
    //如果任务不能响应中断信号,线程不会退出
    private static final int STOP       =  1 << COUNT_BITS;
    //终止状态,所有任务已经被终止,workerCount为0,当线程池会变为TIDYING状态,然后会执行terminated方法(该方法可由子类重写)
    private static final int TIDYING    =  2 << COUNT_BITS;
    //变为TIDYING状态后会执行terminated()方法,该方法执行完成后状态正式变为TERMINATED
	//如果需要,该方法可以在子类进行重写,在ThreadPoolExecutor中是空方法
    private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING:能够接受新的任务,也会处理队列中的任务。线程池初始为此状态。
  • SHUTDOWN:不能接受新的任务,会处理队列中的任务。调用shutdown方法,会变为此状态:RUNNING->SHUTDOWN。
  • STOP:不能接受新的任务,也不会执行队列中的任务,会中断正在执行的任务,会将队列中未执行的任务返回。调用shutdownNow方法会变为此状态:RUNNING(SHUTDOWN)->STOP。
  • TIDYING:线程池终止的前置状态,到此状态时,表示所有任务已经被终止,workerCount为0。线程shutdown和shutdownNow都可能触发变更为此状态,但是具有一定的条件:shutdown时(也就是SHUTDOWN状态),要求队列和当前执行任务都为空;shutdownNow(也就是STOP状态),要求当前执行任务为空。线程池变为TIDYING状态后会执行terminate()方法,该方法在ThreadPoolExecutor中是一个空方法,由子类根据需要重写。
  • TERMINATED:线程池终止的最终状态,在terminate()方法执行完成(即使抛出异常)后设置。

  从定义中可以看到,一个线程池初始为RUNNING状态,调用shutdown方法被调用后会进入SHUTDOWN状态,并且尝试中断空闲的线程,调用shutdownNow后会进入STOP状态,并且尝试中断所有线程。不论是shutdown还是shutdownNow,都不能保证线程池中的线程立即退出,因为不是所有的线程都能响应中断(interrupt)。不论是shutdown还是shutdownNow,都会执行tryTerminate()方法,该方法逻辑会根据线程池状态和工作线程数量判断是否能够终止线程池,TIDYING相当于是TERMINATED的一个前置状态,线程池在终止之前会先变为TIDYING状态,然后调用terminate()方法,最后变为TERMINATED状态。

数据存储

  在线程池的源码中,使用一个AtomicInteger类型的ctl属性来同时表示工作线程数量(workerCount)和线程池状态:低29位保存workerCount,高3位(加低29位0)保存线程池状态。ctl相关的核心定义和操作函数定义如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//Integer.SIZE == 32
	//COUNT_BITS == 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //1左移29位,再减1,就是29个1,这个也是线程池允许的最大工作线程数量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//101 000...(29个0)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//000 000...(29个0)
    private static final int STOP       =  1 << COUNT_BITS;//001 000...(29个0)
    private static final int TIDYING    =  2 << COUNT_BITS;//010 000...(29个0)
    private static final int TERMINATED =  3 << COUNT_BITS;//011 000...(29个0)

    //低29位全部置0,获取线程池状态
    private static int runStateOf(int c)      return c & ~CAPACITY; 
    
    //取低29位,获取workerCount
    private static int workerCountOf(int c)   return c & CAPACITY; 
    
    //组合线程池状态和workerCount为一个完整的ctl
    //比如ctlOf(RUNNING,0),表示RUNNING状态,workerCount为0
    private static int ctlOf(int rs, int wc)  return rs | wc; 

  由于使用29位来保存工作线程数量,那么线程池中允许的最大工作线程数就是(1<<29) - 1 = 536870911。

源码分析

  首先是构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    

  对应的就是前面提到的几个参数(除了workQueue,都使用了volatile修饰,保证了修改对其它线程能即时可见),对参数做了一些检查和处理,比如corePoolSize和maximumPoolSize不能小于0等等,这个没有什么逻辑在里面,我们直接来到提交任务的实现。
  通过ThreadPoolExecutor提交任务大体上来说有两种方式,分别是:

  • submit:向线程池提交一个Runnable或Callable任务,返回一个Future,可以通过Future获取任务执行结果
  • execute:向线程池提交一个Runnable任务,没有返回值

  这里先看看execute的逻辑:

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) 
        	//如果线程数小于核心线程数,那么尝试创建一个新的线程来执行提交的任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        //到这里说明工作线程数大于等于核心线程数(条件判断发现或者并发创建线程失败)
        //检查线程池是否为RUNNING状态,尝试将任务放到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) 
        	//如果任务入队成功,会再次检查线程池状态,如果线程池不是RUNNING状态
        	//会移除刚添加的任务,调用拒绝策略
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
            	//调用拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        //到这里说明线程池不是RUNNING状态,或者任务入队失败,那么尝试创建线程执行任务
        else if (!addWorker(command, false))
        	//如果创建失败,那么调用拒绝策略
            reject(command);
    

这里简单提一点,addWorker方法的核心逻辑是创建一个线程来执行任务,它有两个入参,一个是command表示提交的任务,一个布尔类型的core表示创建线程判断是以corePoolSize作为标准,还是以maximumPoolSize作为标准,下面会贴出相应的源码。

  从这个逻辑中我们看到了线程池的大体处理逻辑:提交任务时如果工作线程数小于核心线程数,那么直接创建新的线程处理任务,如果已经达到核心线程数,那么将任务放到阻塞队列中,如果队列已满,入队失败,那么判断工作线程是否达到maximumPoolSize,如果没达到那么创建新的线程执行任务,否则调用拒绝策略拒绝任务。

注:可以看到,拒绝策略的调用不一定是线程数和队列满了,也可能是线程池已经不是RUNNING状态


  在上面的execute方法中,我们直接就看到了线程池工作的大体逻辑,其中有一个核心的addWorker()方法,该方法的主要作用是在线程池中创建一个新的工作线程来执行任务,注意如果线程创建成功,那么会首先执行此次提交的任务,当次任务完成后才会去从队列中拿。addWorker方法逻辑如下:

    private boolean addWorker(Runnable firstTask, boolean core) 
        retry:
        for (;;) 
            int c = ctl.get();
            int rs = runStateOf(c);
            //SHUTDOWN以后的状态都不允许提交新的任务,但是SHUTDOWN状态允许执行队列中的任务
            //如果当前提交的任务(firstTask)为空,并且队列不为空,并且是SHUTDOWN状态,那么可以在后面逻辑中尝
            //试创建一个线程去帮忙执行队列中的任务
            //这正是签名定义中对SHUTDOWN状态的描述
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                //返回false,调用拒绝策略
                return false;

			//这里自旋,在的多线程并发竞争的情况下,要保证每个线程要么新增工作线程成功,要么失败返回false
            for (;;) 
           		//获取到工作线程数量
                int wc = workerCountOf(c);
                //要创建线程,首先要保证工作线程数量不能超过CAPACITY(也就是29个1)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    //根据core入参来决定使用corePoolSize还是maximumPoolSize作为创建线程的依据
                    return false;
				//通过CAS递增一个workerCount,如果操作成功,那么跳出多层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                
                //说明CAS递增workerCount失败了,那么重新获取ctl然后重试
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                	//线程池状态发生了变更,那么回到顶层自旋重试
                    continue retry;
                //如果线程池状态没有发生变更,那么不用回到顶层for循环去判断线程池状态
            
        

		//到这里说明可以创建一个新的工作线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try 
        	//创建一个Worker,它封装了Thread,并且传入了firstTask
        	//只是创建,不一定会开启线程,下面会再次检查
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
                final ReentrantLock mainLock = this.mainLock;
                //同步锁
                mainLock.lock();
                try 
                    //获取当前线程池状态
                    int rs = runStateOf(ctl.get());
                    //再次检查是否应该开启线程
                    //如果线程池状态小于SHUTDOWN(也就是RUNNING)或者等于SHUTDOWN但是此次提交的任务为空
                    //那么都会真正开启一个任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 
                        //新建多线程来自于threadFactory,这里要检查线程的状态
                        //如果已经开启,那么抛出异常
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                      	//到这里才表示工作线程创建成功
                      	//添加到工作线程集合中(HashSet结构)
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                        	//这里记录工作线程的历史最大数量
                            largestPoolSize = s;
                        //标识工作线程添加成功
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                if (workerAdded) 
                	//如果工作线程添加成功,那么在这里开启线程
                    t.start();
                    //标识线程已经启动
                    workerStarted = true;
                
            
         finally 
            if (! workerStarted)
            	//如果线程没有启动,表示工作线程其实添加失败了,需要在addWorkerFailed方法里回滚
                addWorkerFailed(w);
        
        //返回线程是否成功启动
        return workerStarted;
    

  addWorder方法的逻辑大致可以分为两大部分:

  • 第一是确定此次提交任务能否在线程池中创建一个新的工作线程。通过两个for循环实现 ,第一层for循环主要是判断线程池的状态,第二层for循环主要是根据当前工作线程数量和限制数量(corePoolSize或maximumPoolSize)判断能否新增工作线程(CAS递增workerCount,失败的话一直重试)
  • 第二是创建工作线程。首先会通过threadFactory创建一个线程,该线程包装在一个Worker对象中,并且该Worker默认绑定的是当前提交的任务,即使首先创建了Worker,在加锁之后还会再次检查线程池状态,可能检查的结果不允许开启新的工作线程,那么会在addWorkerFailed方法中进行回滚

  现在我们从上述addWorker方法的代码逻辑中可以得出几个关键点:

  • 向线程池中添加工作线程的核心逻辑使用了ReentrantLock做同步控制
  • 线程池中的工作线程都是在提交任务的时候才触发创建的,并且默认绑定当前提交的任务
  • 如果线程池处于SHUTDOWN状态,但是阻塞队列中还有任务待执行,那么还可以通过提交一个null任务尝试去新建一个工作线程帮忙处理,当然能否创建还是要根据当前线程池状态和参数设置来判断
  • 工作线程包装在Worker对象中,并且默认传入了当前提交的任务firstTask,Worker存储在一个HashSet集合中。所以如果核心线程和阻塞队列满了,这时候提交任务触发创建的工作线程不是直接从阻塞队列中获取任务,而是先执行此次提交的任务,此次任务执行完才回去从队列中取任务
  • 由于线程是由线程工厂ThreadFactory创建的,这个工厂可以由开发人员自己指定,所以需要判断创建的线程是否已经启动了,如果已经启动,则会抛出IllegalThreadStateException异常
  • 即使已经创建好了线程(Worker),也要再次检查线程池当前状态,如果不符合开启线程的条件(参考前面线程池各个状态的定义),那么不会真正开启线程,表示工作线程创建失败了,需要在addWorkerFailed方法中进行回滚,比如对workerCount做减1(因为前面已经对workerCount加了1)

  在addWorker中的逻辑我们已经知道工作线程被包装在Worker对象中,那么我们来看看Worker的构造函数:

Worker(Runnable firstTask) 
	setState(-1); //默认设置state为-1,因为如果Worker刚创建,还没有执行任务,那么不能被中断
	this.firstTask = firstTask;
	this.thread = getThreadFactory().newThread(this);

  这里注意到,创建线程传入的Runnable是this,这个this就是Worker对象本身,Worker实现了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable 
	public void run() 
		runWorker(this);
	

  可以看到Worker本身也是继承了AQS的,它重写了tryAcquire方法:

		protected boolean tryAcquire(int unused) 
            if (compareAndSetState(0, 1)) 
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
            return false;
        
        protected boolean tryRelease(int unused) 
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        
		//调用的父类AQS的acquire,如果竞争失败,会陷入阻塞
        public void lock()         acquire(1); 
        //直接调用本类重写的tryAcquire,竞争失败直接返回false
        public boolean tryLock()   return tryAcquire(1); 
        //调用父类AQS的release解锁
        public void unlock()       release(1); 

  逻辑很简单,它不允许线程重入,并且提供了不会阻塞线程的tryLock方法(该方法在后续中断空闲线程的时候有用),所以这里没有使用ReentrantLock和Synchronized这些工具。Worker在初始化的时候会将state设置为-1,此时如果调用tryLock方法会返回false,因为其CAS的期望值是0,以此达到Worker刚刚创建还没有执行任务的时候不能被中断的目的。

Worker对应的线程在执行任务的时候会通过自身调用lock方法获取独占锁,在任务完成之后会释放独占锁,所以获取到了独占锁就表示线程获取到了需要执行的任务,在后续中断空闲线程的时候就不能中断这些非空闲的线程,调用tryLock方法返回false就可表示worker线程正在执行任务。但是由于Worker创建的时候初始化state为-1,那么tryLock返回false还可能表示Worker刚刚创建,后面会在runWorker方法中主动调用unlock方法将其设置为0,表示可以被中断。

  Worker是ThreadPoolExecutor的内部类,在其实现的run方法中调用的runWorker方法定义在ThreadPoolExecutor中,所以这里创建的线程会通过Worker的run方法调用外部类ThreadPoolExecutor的runWorker方法,并且将worker实例传入,那么我们就来看看runWorker方法:

final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //由于Worker在初始化的时候将state设置为-1,表示不允许中断
        //这里调用unlock方法将其设置为0(看看上面Worker重写的tryRelease方法),表示可以被中断
        w.unlock();
        boolean completedAbruptly = true;
        try 
        	//task默认是创建Worker时传入的任务,如果该任务为空,那么调用getTask()方法从队列中获取任务
            while (task != null || (task = getTask()) != null) 
            	//通过Worker的lock方法加锁,一旦线程获取了这个独占锁,那么表示线程正在工作中
            	//任务完成会在finally代码块中unlock释放锁
                w.lock();
                // 1.线程池可能正在终止过程中(stater>=STOP),如果正在终止那么需要保证当前线程是中断状态
                // 2.如果线程池没有终止,那么要保证线程不是中断状态。但是这种情况下需要重新判断线程池状态,并且清除线程中断标记,因为在if					                  	
                // 语句期间,可能线程池调用了shutdownNow方法,该方法不会调用worker.lock方法,所以这是可能发生的
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    //只是中断线程,不是退出线程,如何响应中断要看任务自身如何处理
                    wt.interrupt();
                try 
                	//空方法,交给子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                    	//真正执行任务
                        task.run();
                     catch (RuntimeException x) 
                        thrown = x; throw x;
                     catch (Error x) 
                        thrown = x; throw x;
                     catch (Throwable x) 
                        thrown = x; throw new Error(x);
                     finally 
                  		//空方法,交给子类实现
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    //任务完成数量加1,解锁
                    w.completedTasks++Java并发编程系列之二十八:CompletionService

Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析

Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析

Java并发编程原理与实战三十八:多线程调度器(ScheduledThreadPoolExecutor)

转: Java并发编程之十八:第五篇中volatile意外问题的正确分析解答(含代码)

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁信号量事件队列生产者消费者模型