Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析

Posted 小小工匠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析相关的知识,希望对你有一定的参考价值。


线程池主要解决两个问题

  • 一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁是需要开销的。线程池里面的线程是可复用的,不需要每次执行异步任务时都重新创建和销毁线程。

  • 二是线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等。每个ThreadPoolExecutor也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

另外,线程池也提供了许多可调参数和可扩展性接口,以满足不同情境的需要,程序员可以使用更方便的Executors的工厂方法,比如newCachedThreadPool(线程池线程个数最多可达Integer.MAX_VALUE,线程自动回收)、newFixedThreadPool(固定大小的线程池)和newSingleThreadExecutor(单个线程)等来创建线程池,当然用户还可以自定义。


类关系图

在上图中,Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。


ctl 含义 ---- 记录线程池状态和线程池中线程个数

ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。

  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这里假设Integer类型是32位二进制表示,则其中高3位用来表示线程池状态,后面29位用来记录线程池线程个数。

/用来标记线程池状态(高3位),线程个数(低29位)
//默认是RUNNING状态,线程个数为0

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程个数掩码位数
private static final int COUNT_BITS = Integer.SIZE - 3;

//线程最大个数(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


线程池状态:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// 获取高三位 运行状态
private static int runStateOf(int c)      return c & ~CAPACITY; 

//获取低29位 线程个数
private static int workerCountOf(int c)   return c & CAPACITY; 

//计算ctl新值,线程状态 与 线程个数
private static int ctlOf(int rs, int wc)  return rs | wc; 


线程池状态 及转换

  • RUNNING:接受新任务并且处理阻塞队列里的任务。

  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

  • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务。

  • TIDYING:所有任务都执行完(包含阻塞队列里面的任务)后当前线程池活动线程数为0,将要调用terminated方法。

  • TERMINATED:终止状态。terminated方法调用完成以后的状态。

线程池状态转换列举如下。

  • RUNNING -> SHUTDOWN :显式调用shutdown()方法,或者隐式调用了finalize()、方法里面的shutdown()方法。

  • RUNNING 或 SHUTDOWN)-> STOP :显式调用 shutdownNow()方法时。

  • SHUTDOWN -> TIDYING :当线程池和任务队列都为空时。

  • STOP -> TIDYING :当线程池为空时。

  • TIDYING -> TERMINATED: 当 terminated() hook 方法执行完成时


线程池参数

  • corePoolSize:线程池核心线程个数。

  • workQueue:用于保存等待执行的任务的阻塞队列, 比如基于数组的有界ArrayBlockingQueue、基于链表的无界LinkedBlockingQueue、最多只有一个元素的同步队列SynchronousQueue及优先级队列PriorityBlockingQueue等。

  • maximunPoolSize:线程池最大线程数量。

  • ThreadFactory:创建线程的工厂。

  • RejectedExecutionHandler:饱和策略,当队列满并且线程个数达到maximunPoolSize后采取的策略。

    比如
    AbortPolicy(抛出异常)、
    CallerRunsPolicy(使用调用者所在线程来运行任务)、
    DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)
    DiscardPolicy(默默丢弃,不抛出异常)

  • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量多,并且是闲置状态,则这些闲置的线程能存活的最大时间。

  • TimeUnit:存活时间的时间单位


线程池类型

  • newFixedThreadPool :创建一个核心线程个数和最大线程个数都为nThreads的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收
    public static ExecutorService newFixedThreadPool(int nThreads) 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    
	// 使用自定义线程创建工厂
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    
  • newSingleThreadExecutor: 创建一个核心线程个数和最大线程个数都为1的线程池,并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
    public static ExecutorService newSingleThreadExecutor() 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    
  • newCachedThreadPool :创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务。
  public static ExecutorService newCachedThreadPool() 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    

mainLock & termination

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     */
    private final ReentrantLock mainLock = new ReentrantLock();

   /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();
  • mainLock是独占锁,用来控制新增Worker线程操作的原子性。

  • termination是该锁对应的条件队列,在线程调用awaitTermination时用来存放阻塞的线程。


Worker

Worker继承AQS和Runnable接口,是具体承载任务的对象。Worker继承了AQS,自己实现了简单不可重入独占锁,其中state=0表示锁未被获取状态,state=1表示锁已经被获取的状态,state=-1是创建Worker时默认的状态,创建时状态设置为-1是为了避免该线程在运行runWorker()方法前被中断。其中变量firstTask记录该工作线程执行的第一个任务,thread是具体执行任务的线程。

DefaultThreadFactory是线程工厂,newThread方法是对线程的一个修饰。其中poolNumber是个静态的原子变量,用来统计线程工厂的个数,threadNumber用来记录每个线程工厂创建了多少线程,这两个值也作为线程池和线程的名称的一部分。


源码分析

public void execute(Runnable command)

execute方法的作用是提交任务command到线程池进行执行。用户线程提交任务到线程池的模型图如下图所示。

从该图可以看出,ThreadPoolExecutor的实现实际是一个生产消费模型,当用户添加任务到线程池时相当于生产者生产元素,workers线程工作集中的线程直接执行任务或者从任务队列里面获取任务时则相当于消费者消费元素。

用户线程提交任务的execute方法的具体代码如下

public void execute(Runnable command) 

		// 1  任务为null ,抛出 npe异常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        
        // 2 获取当前线程池的状态 + 线程个数变量的组合值 
        int c = ctl.get();
		// 3 当前线程池中的个数是否小于corePoolSize ,小于的话则开启新的线程 
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        // 4 如果线程池处于running状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) 
        	// 4.1 二次检查
            int recheck = ctl.get();
            // 4.2 如果当前线程池的状态不是running 则从队列中移除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 4.3 否则当线程数数量为空,则添加一个线程    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        	
        // 5 如果队列满,则新增线程,新增线程失败,触发拒绝策略 
        else if (!addWorker(command, false))
            reject(command);
    
  • 代码(3)判断如果当前线程池中线程个数小于corePoolSize,会向workers里面新增一个核心线程(core线程)执行该任务。

  • 如果当前线程池中线程个数大于等于corePoolSize则执行代码(4)。如果当前线程池处于RUNNING状态则添加当前任务到任务队列。这里需要判断线程池状态是因为有可能线程池已经处于非RUNNING状态,而在非RUNNING状态下是要抛弃新任务的。

  • 如果向任务队列添加任务成功,则代码(4.2)对线程池状态进行二次校验,这是因为添加任务到任务队列后,执行代码(4.2)前有可能线程池的状态已经变化了。这里进行二次校验,如果当前线程池状态不是RUNNING了则把任务从任务队列移除,移除后执行拒绝策略;如果二次校验通过,则执行代码(4.3)重新判断当前线程池里面是否还有线程,如果没有则新增一个线程。

  • 如果代码(4)添加任务失败,则说明任务队列已满,那么执行代码(5)尝试新开启线程(如上的thread3和thread4)来执行该任务,如果当前线程池中线程个数>maximumPoolSize则执行拒绝策略。

新增线程addWorkder源码分析

/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) 
        retry:
        for (;;) 
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 
            // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			
			// 7 循环CAS增加线程个数 
            for (;;) 
                int wc = workerCountOf(c);
                // 7.1 如果线程个数超过限制 则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 7.2 cas增加线程个数,同时只能有1个线程成功  
                if (compareAndIncrementWorkerCount(c))
                    break retry;
               // 7.3 cas失败了,则看线程池状态是否变化了,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            
        
		
		// 8 到这里,说明CAS成功了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try 
        	// 8.1 创建Worker
        	 w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
            	// 8.2 加独占锁,为了workers同步,因为可能多个线程调用了线程池的execute方法。
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try 
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //8.3 重新检查线程池的状态,避免在获取锁前调用了shutdown接口
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 8.4 添加任务
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                // 8.5 添加成功,则启动线程
                if (workerAdded) 
                    t.start();
                    workerStarted = true;
                
            
         finally 
            if (! workerStarted)
                addWorkerFailed(w);
        
        return workerStarted;
    

主要分两个部分:

  • 第一部分双重循环的目的是通过CAS操作增加线程数;
  • 第二部分主要是把并发安全的任务添加到workers里面,并且启动任务执行。

首先来分析第一部分的代码6

    // 6 检查队列是否只在必要的时候为空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

展开!运算后等价于

rs >= SHUTDOWN &&
               (rs != SHUTDOWN ||
             firstTask != null ||
             workQueue.isEmpty())

也就是说下面几种情况下会返回false:

  • 当前线程池状态为STOP,TIDYING,TERMINATED
  • 当前线程池状态为SHUTDOWN并且已经有了第一个任务
  • 当前线程池状态为SHUTDOWN并且任务队列为空

内层循环的作用是使用CAS操作增加线程数,代码(7.1)判断如果线程个数超限则返回false,否则执行代码(7.2)CAS操作设置线程个数,CAS成功则退出双循环,CAS失败则执行代码(7.3)看当前线程池的状态是否变化了,如果变了,则再次进入外层循环重新获取线程池状态,否则进入内层循环继续进行CAS尝试。

执行到第二部分的代码(8)时说明使用CAS成功地增加了线程个数,但是现在任务还没开始执行。这里使用全局的独占锁来控制把新增的Worker添加到工作集workers中。代码(8.1)创建了一个工作线程Worker。

代码(8.2)获取了独占锁,代码(8.3)重新检查线程池状态,这是为了避免在获取锁前其他线程调用了shutdown关闭了线程池。如果线程池已经被关闭,则释放锁,新增线程失败,否则执行代码(8.4)添加工作线程到线程工作集,然后释放锁。代码(8.5)判断如果新增工作线程成功,则启动工作线程。


工作线程Worker的执行

用户线程提交任务到线程池后,由Worker来执行。先看下Worker的构造函数。

		/**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) 
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //创建一个线程
            this.thread = getThreadFactory().newThread(this);
        

在构造函数内首先设置Worker的状态为-1,这是为了避免当前Worker在调用runWorker方法前被中断(当其他线程调用了线程池的shutdownNow时,如果Worker状态>=0则会中断该线程)。这里设置了线程的状态为-1,所以该线程就不会被中断了。在如下runWorker代码中,运行代码(9)时会调用unlock方法,该方法把status设置为了0,所以这时候调用shutdownNow会中断Worker线程。

  /** Delegates main run loop to outer runWorker  */
        public void run() 
            runWorker(this);
        
  final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 9 将state 置为0 ,允许终端
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
        	// 10 
	            while (task != null || (task = getTask()) != null) 
            	// 10.1 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                	// 10.2 执行任务前干一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                    	// 10.3 执行任务
                        task.run以上是关于Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_Unsafe

Java Review - 并发编程_Unsafe

Java Review - 并发编程_前置知识二

Java Review - 并发编程_抽象同步队列AQS

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析