「超详细」Java线程池源码解析

Posted 倾听铃的声

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了「超详细」Java线程池源码解析相关的知识,希望对你有一定的参考价值。

绕不开的线程池

只看ThreadPoolExecutor的英文语义就能知道这是一个与线程池有关的类。

关于线程池,搞过开发的肯定都知道,也都能或多或少讲出相关知识;尽管如此,作者在还是想要不厌其烦的给大家加深加深记忆

线程池是一种池化技术,Java中类似的池化技术有很多,
常见的有:

  • 数据库连接池
  • redis连接池
  • http连接池
  • 内存池
  • 线程池

池化技术的作用:把一些能够复用的东西(比如说连接、线程)放到初始化好的池中,便于资源统一管理。
这样做的好处:

避免重复创建、销毁、调度的开销,提高性能 保证内核的充分利用,防止过分调度 自定义参数配置达到最佳的使用效果

ThreadPoolExecutor 知识点

Java中创建线程池的方法

不推荐

通过Executors类的静态方法创建如下线程池

  • FixedThreadPool (固定个数)
  • ScheduledThreadPool (执行周期性任务)
  • WorkStealingPool (根据当前电脑CPU处理器数量生成相应线程数)
  • CachedThreadPool (带缓存功能)
  • SingleThreadPool (单个线程)

 

推荐

通过ThreadPoolExecutor创建线程池

    // 给线程定义有业务含义的名称
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build();
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            5,  // 线程池核心线程数
            10,  // 线程池最大线程数,达到最大值后线程池不会再增加线程
            1000,  // 线程池中超过corePoolSize数目的空闲线程最大存活时间
            TimeUnit.MILLISECONDS,  // 时间单位,毫秒
            new LinkedBlockingQueue<>(50),  // 工作线程等待队列
            threadFactory,  // 自定义线程工厂
            new ThreadPoolExecutor.AbortPolicy());  // 线程池满时的拒绝策略
复制代码

为什么

先来看看阿里巴巴出品的《Java开发手册》中怎么说?

 

再来看看源码怎么写? 

以SingleThreadPool为例,其实现也是通过ThreadPoolExecutor的构造方法创建的线程池, 之所以不推荐的原因是其使用了LinkedBlockingQueue作为工作线程的等待队列,其是一种无界缓冲等待队列,该队列的默认构造器定义的长度为Integer.MAX_VALUE 

FixedThreadPool同理 

CachedThreadPool采用了SynchronousQueue队列,也是一种无界无缓冲等待队列,而且其最大线程数是Integer.MAX_VALUE 

ScheduledThreadPool采用了DelayedWorkQueue队列,是一种无界阻塞队列,其最大线程数是Integer.MAX_VALUE 

以上四种线程池都有OOM的风险
相反,在使用ThreadPoolExecutor时,我们可以指定有界/无界阻塞队列,并指定初始长度。

ThreadPoolExecutor源码分析

线程池生命周期

 

Tips:千万不要把线程池的状态和线程的状态弄混了。补一张网上的线程状态图 

Tips:当线程调用start(),线程在JVM中不一定立即执行,有可能要等待操作系统分配资源,此时为READY状态,当线程获得资源时进入RUNNING状态,才会真正开始执行。

拒绝策略

  • CallerRunsPolicy(在当前线程中执行)

 

 

  • AbortPolicy(直接抛出RejectedExecutionException)

 

  • DiscardPolicy(直接丢弃线程)
  • DiscardOldestPolicy(丢弃一个未被处理的最久的线程,然后重试)

 

当没有显示指明拒绝策略时,默认使用AbortPolicy 

 

ThreadPoolExecutor类图

通过IDEA的Diagrams工具查看UML类图,继承关系一目了然

 

ThreadPoolExecutor类中的方法很多,最核心就是构造线程池的方法和执行线程任务的方法,先前已经给出了标准的构造方法,接下来就讲一讲如何执行线程任务...

任务执行机制

  • 通过执行execute方法

该方法无返回值,为ThreadPoolExecutor自带方法,传入Runnable类型对象

 

  • 通过执行submit方法

该方法返回值为Future对象,为抽象类AbstractExecutorService的方法,被ThreadPoolExecutor继承,其内部实现也是调用了接口类Executor的execute方法,通过上面的类图可以看到,该方法的实现依然是ThreadPoolExecutor的execute方法

 

 

execute()执行流程图 

execute()源码解读

    // 使用原子操作类AtomicInteger的ctl变量,前3位记录线程池的状态,后29位记录线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // Integer的范围为[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用来辅助左移位运算
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 高三位用来存储线程池运行状态,其余位数表示线程池的容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池状态以常量值被存储在高三位中
    private static final int RUNNING    = -1 << COUNT_BITS; // 线程池接受新任务并会处理阻塞队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 线程池不接受新任务,但会处理阻塞队列中的任务
    private static final int STOP       =  1 << COUNT_BITS; // 线程池不接受新的任务且不会处理阻塞队列中的任务,并且会中断正在执行的任务
    private static final int TIDYING    =  2 << COUNT_BITS; // 所有任务都执行完成,且工作线程数为0,将调用terminated方法
    private static final int TERMINATED =  3 << COUNT_BITS; // 最终状态,为执行terminated()方法后的状态

    // ctl变量的封箱拆箱相关的方法
    private static int runStateOf(int c)      return c & ~CAPACITY;  // 获取线程池运行状态
    private static int workerCountOf(int c)   return c & CAPACITY;  // 获取线程池运行线程数
    private static int ctlOf(int rs, int wc)  return rs | wc;  // 获取ctl对象
复制代码
public void execute(Runnable command) 
    if (command == null) // 任务为空,抛出NPE
        throw new NullPointerException();
        
    int c = ctl.get(); // 获取当前工作线程数和线程池运行状态(共32位,前3位为运行状态,后29位为运行线程数)
    if (workerCountOf(c) < corePoolSize)  // 如果当前工作线程数小于核心线程数
        if (addWorker(command, true)) // 在addWorker中创建工作线程并执行任务
            return;
        c = ctl.get();
    
    
    // 核心线程数已满(工作线程数>核心线程数)才会走下面的逻辑
    if (isRunning(c) && workQueue.offer(command))  // 如果当前线程池状态为RUNNING,并且任务成功添加到阻塞队列
        int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能已成为SHUTDOWN状态
        if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务
            reject(command); // 执行拒绝策略
        else if (workerCountOf(recheck) == 0) // 当线程池中的workerCount为0时,此时workQueue中还有待执行的任务,则新增一个addWorker,消费workqueue中的任务
            addWorker(null, false);
    
    // 阻塞队列已满才会走下面的逻辑
    else if (!addWorker(command, false)) // 尝试增加工作线程执行command
        // 如果当前线程池为SHUTDOWN状态或者线程池已饱和
        reject(command); // 执行拒绝策略

复制代码
private boolean addWorker(Runnable firstTask, boolean core) 
    retry: // 循环退出标志位
    for (;;)  // 无限循环
        int c = ctl.get();
        int rs = runStateOf(c); // 线程池状态

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && 
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 换成更直观的条件语句
            // (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
           )
           // 返回false的条件就可以分解为:
           //(1)线程池状态为STOP,TIDYING,TERMINATED
           //(2)线程池状态为SHUTDOWN,且要执行的任务不为空
           //(3)线程池状态为SHUTDOWN,且任务队列为空
            return false;

        // cas自旋增加线程个数
        for (;;) 
            int wc = workerCountOf(c); // 当前工作线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize)) // 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)
                return false;
            if (compareAndIncrementWorkerCount(c)) // 执行cas操作,添加线程个数
                break retry; // 添加成功,退出外层循环
            // 通过cas添加失败
            c = ctl.get();  
            // 线程池状态是否变化,变化则跳到外层循环重试重新获取线程池状态,否者内层循环重新cas
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        
    
    // 简单总结上面的CAS过程:
    //(1)内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas
    //(2)cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了
    //(3)如果变了,则重新进入外层循环重新获取线程池状态,否者重新进入内层循环继续进行cas

    // 走到这里说明cas成功,线程数+1,但并未被执行
    boolean workerStarted = false; // 工作线程调用start()方法标志
    boolean workerAdded = false; // 工作线程被添加标志
    Worker w = null;
    try 
        w = new Worker(firstTask); // 创建工作线程实例
        final Thread t = w.thread; // 获取工作线程持有的线程实例
        if (t != null) 
            final ReentrantLock mainLock = this.mainLock; // 使用全局可重入锁
            mainLock.lock(); // 加锁,控制并发
            try 
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get()); // 获取当前线程池状态

                // 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) 
                    if (t.isAlive()) // 检查线程是否处于活跃状态
                        throw new IllegalThreadStateException();
                    workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                
             finally 
                mainLock.unlock(); // finally块中释放锁
            
            if (workerAdded)  // 线程添加成功
                t.start(); // 调用线程的start()方法
                workerStarted = true;
            
        
     finally 
        if (! workerStarted) // 如果线程启动失败,则执行addWorkerFailed方法
            addWorkerFailed(w);
    
    return workerStarted;

复制代码
private void addWorkerFailed(Worker w) 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        if (w != null)
            workers.remove(w); // 线程启动失败时,需将前面添加的线程删除
        decrementWorkerCount(); // ctl变量中的工作线程数-1
        tryTerminate(); // 尝试将线程池转变成TERMINATE状态
     finally 
        mainLock.unlock();
    

复制代码
final void tryTerminate() 
    for (;;) 
        int c = ctl.get();
        // 以下情况不会进入TERMINATED状态:
        //(1)当前线程池为RUNNING状态
        //(2)在TIDYING及以上状态
        //(3)SHUTDOWN状态并且工作队列不为空
        //(4)当前活跃线程数不等于0
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0)  // 工作线程数!=0
            interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的线程
            return;
        

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            // 通过CAS自旋判断直到当前线程池运行状态为TIDYING并且活跃线程数为0
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) 
                try 
                    terminated(); // 调用线程terminated()
                 finally 
                    ctl.set(ctlOf(TERMINATED, 0)); // 设置线程池状态为TERMINATED,工作线程数为0
                    termination.signalAll(); // 通过调用Condition接口的signalAll()唤醒所有等待的线程
                
                return;
            
         finally 
            mainLock.unlock();
        
        // else retry on failed CAS
    

复制代码

Worker源码解读

Worker是ThreadPoolExecutor类的内部类,此处只讲最重要的构造函数和run方法

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

    // 该worker正在运行的线程
    final Thread thread;
    
    // 将要运行的初始任务
    Runnable firstTask;
    
    // 每个线程的任务计数器
    volatile long completedTasks;

    // 构造方法   
    Worker(Runnable firstTask) 
        setState(-1); // 调用runWorker()前禁止中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this); // 通过ThreadFactory创建一个线程
    

    // 实现了Runnable接口的run方法
    public void run() 
        runWorker(this);
    
    
    ... // 此处省略了其他方法

复制代码

Worker实现了Runable接口,在调用start()方法后,实际执行的是run方法

final void runWorker(Worker w) 
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例
    w.firstTask = null;
    w.unlock(); // status设置为0,允许中断
    boolean completedAbruptly = true; // 线程意外终止标志
    try 
        // 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行
        while (task != null || (task = getTask()) != null) 
            w.lock(); // 加锁,保证下方临界区代码的线程安全
            // 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt(); // 中断当前线程
            try 
                beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义
                Throwable thrown = null;
                try 
                    task.run(); // 执行线程的run方法
                 catch (RuntimeException x) 
                    thrown = x; throw x;
                 catch (Error x) 
                    thrown = x; throw x;
                 catch (Throwable x) 
                    thrown = x; throw new Error(x);
                 finally 
                    afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义
                
             finally 
                task = null; // 将循环变量task设置为null,表示已处理完成
                w.completedTasks++; // 当前已完成的任务数+1
                w.unlock();
            
        
        completedAbruptly = false;
     finally 
        processWorkerExit(w, completedAbruptly);
    

复制代码

从任务队列中取出一个任务

private Runnable getTask() 
    boolean timedOut = false; // 通过timeOut变量表示线程是否空闲时间超时了
    // 无限循环
    for (;;) 
        int c = ctl.get(); // 线程池信息
        int rs = runStateOf(c); // 线程池当前状态

        // 如果线程池状态>=SHUTDOWN并且工作队列为空 或 线程池状态>=STOP,则返回null,让当前worker被销毁
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 
            decrementWorkerCount(); // 工作线程数-1
            return null;
        

        int wc = workerCountOf(c); // 获取当前线程池的工作线程数

        // 当前线程是否允许超时销毁的标志
        // 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
        // 且(当前线程数大于1 或 阻塞队列为空)
        // 则减少worker计数并返回null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) 
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        

        try 
            // 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r; // 返回从队列中取出的任务
            timedOut = true;
         catch (InterruptedException retry) 
            timedOut = false;
        
    

复制代码

总结一下哪些情况getTask()会返回null:

线程池状态为SHUTDOWN且任务队列为空 线程池状态为STOP、TIDYING、TERMINATED 线程池线程数大于最大线程数 线程可以被超时回收的情况下等待新任务超时

工作线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) 
    // 如果completedAbruptly为true则表示任务执行过程中抛出了未处理的异常
    // 所以还没有正确地减少worker计数,这里需要减少一次worker计数
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        // 把将被销毁的线程已完成的任务数累加到线程池的完成任务总数上
        completedTaskCount += w.completedTasks;
        workers.remove(w); // 从工作线程集合中移除该工作线程
     finally 
        mainLock.unlock();
    

    // 尝试结束线程池
    tryTerminate();

    int c = ctl.get();
    // 如果是RUNNING 或 SHUTDOWN状态
    if (runStateLessThan(c, STOP)) 
        // worker是正常执行完
        if (!completedAbruptly) 
            // 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果当前线程数已经满足最小线程数要求,则不需要再创建替代线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        
        // 重新创建一个worker来代替被销毁的线程
        addWorker(null, false);
    

 小伙伴们有兴趣想了解更多相关学习资料请点赞收藏+评论转发+关注我之后私信我,注意回复【000】即可获取更多免费资料!

 

 

超详细的java线程池源码解析

线程池的继承关系是这样的ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,ExecutorService又是继承了Executor接口。 



继承关系:

ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor


      线程池的核心方法是execute(Runnable command) 和submit(Runnable task) 而submit方法也是调用execute(Runnable command)完成,所以重点来看execute(Runnable command)的源码


 

[java] view plain copy

  1. public void execute(Runnable command) {  

  2.         if (command == null)  

  3.             throw new NullPointerException();  

  4.         //如果当前线程数小于核心线程数大小执行addWorker()方法,增加一个线程执行  

  5.         int c = ctl.get();  

  6.         if (workerCountOf(c) < corePoolSize) {  

  7.             //成功执行addWorker()就返回  

  8.             if (addWorker(command, true))  

  9.                 return;  

  10.             //没有成功执行获取最新的当前线程数  

  11.             c = ctl.get();  

  12.         }  

  13.         //如果是运行状态,并且加入等待队列成功执行if块(额外含义:线程池是运行状态已经达到核心线程数,优先放入队列)  

  14.         if (isRunning(c) && workQueue.offer(command)) {//1  

  15.             //先获取最新的线程数  

  16.             int recheck = ctl.get();  

  17.             //再次判断如果线程池不是运行态了并且移除本次提交任务成功,执行拒绝操作  

  18.             if (! isRunning(recheck) && remove(command))  

  19.                 reject(command);  

  20.             //如果是运行状态,或者线程不是运行态但是移除任务队列失败,  

  21.             //则检查是否有工作线程在消费队列,如果有则什么都不做(可以确保刚提交进队列的任务被完成),  

  22.             //如果没有需要建立一个消费线程用来消费刚刚提交的任务  

  23.             else if (workerCountOf(recheck) == 0)  

  24.                 addWorker(nullfalse);//2  

  25.         }  

  26.         //如果不是运行态或者加入队列失败那么尝试执行提交过来的任务,如果执行失败,走拒绝操作(额外含义:核心线程数满了,队列也满了,尝试建立新的线程消费,新线程数要小于最大线程数)  

  27.         else if (!addWorker(command, false))  

  28.             reject(command);  

  29.     }  





总结概述: 

来了新任务,如果工作线程数还没有达到线程池的核心线程数尝试创建新的线程执行(addWork方法里)。 

如果已经达到核心线程数或者开启新线程失败,检查线程池是否为运行态,是的话加入等待队列。 

如果线程池是已经不再运行态或者加入等待队列失败,尝试开启一个线程执行刚提交的任务,开线程失败执行拒绝流程。 

如果是运行态并且也加入到等待队列成功,检查线程池是否还是运行(可能被其他线程停止),如果不是运行态,执行移除操作,然后执行拒绝策略, 

如果是运行态或者不是运行态但移除任务失败检查还有没有线程在消费任务,没有的话尝试建立一个消费线程消费刚提交到等待队列里的任务


消费任务的重要方法是addWorker(Runnable firstTask, boolean core); 

其有四种组合: 

一、addWorker(Runnable,true)小于核心线程数使用 

二、addWorker(Runnable,false)大于核心线程数,并且等待队列也满了情况使用 

三、addWorker(null,true)没有任务创建一个线程等待任务到来使用(小于核心线程数的情况) 

四、addWorker(null,false)没有任务创建一个线程等待任务到来使用(小于最大线程数的情况)


 

[java] view plain copy

  1. private boolean addWorker(Runnable firstTask, boolean core) {  

  2.         retry:  

  3.         for (;;) {  

  4.             int c = ctl.get();  

  5.             int rs = runStateOf(c);  

  6.   

  7.             // 线程池状态RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3  

  8.             // 如果线程池状态是shutdown及以后的任意一种状态,说明调用了关闭线程池的方法,  

  9.             //并且不符合[rs等于shutdown,并且传进来的任务是空,并且工作队列不等于空],  

  10.             //这个判断条件是为了处理上个方法代码2处的情况,  

  11.             //即线程池已经不是运行态(仅仅调用了shutdown方法),并且弹出队列失败,  

  12.             //这种情况需要保证提交上来的任务得到执行,因此传过来一个null的任务,  

  13.             //目的是为了让线程池启动一个线程执行刚提交的任务,  

  14.             //(隐含shutdown状态添加到队列中的任务(移除失败的)还是会被执行),  

  15.             //如果已经不只是SHUTDOWN证明掉用过shutdownnow方法,直接返回false,  

  16.             //或者仅调用shutdown后又来的新任务也返回false拒绝执行,  

  17.             //或者是刚添加到队列的任务已经被其他线程消费过了,也返回false  

  18.             if (rs >= SHUTDOWN &&  

  19.                 ! (rs == SHUTDOWN &&  

  20.                    firstTask == null &&  

  21.                    ! workQueue.isEmpty()))  

  22.                 return false;  

  23.   

  24.             for (;;) {  

  25.   

  26.                 int wc = workerCountOf(c);  

  27.                 //检查工作线程数,如果大于线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)  

  28.                 //或者跟边界值比较已经到达边界值都返回false  

  29.                 if (wc >= CAPACITY ||  

  30.                     wc >= (core ? corePoolSize : maximumPoolSize))  

  31.                     return false;  

  32.                 //如果增加工作数成功跳出循环往下执行  

  33.                 if (compareAndIncrementWorkerCount(c))  

  34.                     break retry;  

  35.                 c = ctl.get();  // Re-read ctl  

  36.                 //如果增加工作线程数失败(可能调用了shutdown方法),  

  37.                 //如果两次状态不一致则跳转到retry处重新尝试执行  

  38.                 if (runStateOf(c) != rs)  

  39.                     continue retry;  

  40.                 // 都没发生循环执行  

  41.             }  

  42.         }  

  43.   

  44.         boolean workerStarted = false;  

  45.         boolean workerAdded = false;  

  46.         Worker w = null;  

  47.         try {  

  48.             //把传进来的任务包装成worker对象  

  49.             w = new Worker(firstTask);  

  50.             //实际上t就是worker对象,只不过有名字等相关信息  

  51.             final Thread t = w.thread;  

  52.             if (t != null) {  

  53.                 final ReentrantLock mainLock = this.mainLock;  

  54.                 mainLock.lock();  

  55.                 try {  

  56.                     // 再次检查线程池状态  

  57.                     int rs = runStateOf(ctl.get());  

  58.                     //如果是运行态直接执行,或如果是shutdown状态但传进来是个null,即前边说的移除队列失败情况  

  59.                     if (rs < SHUTDOWN ||  

  60.                         (rs == SHUTDOWN && firstTask == null)) {  

  61.                         if (t.isAlive()) // 检查这个对象是否被其他线程执行过  

  62.                             throw new IllegalThreadStateException();  

  63.                         //加入到workers中  

  64.                         workers.add(w);  

  65.                         int s = workers.size();  

  66.                         //如果大于曾经执行过的最大线程数则最大线程数加1  

  67.                         if (s > largestPoolSize)  

  68.                             largestPoolSize = s;  

  69.                         workerAdded = true;  

  70.                     }  

  71.                 } finally {  

  72.                     mainLock.unlock();  

  73.                 }  

  74.                 //如果增加成功启动新线程执行  

  75.                 if (workerAdded) {  

  76.                     t.start();  

  77.                     workerStarted = true;  

  78.                 }  

  79.             }  

  80.         } finally {  

  81.             //如果启动失败从workers中移除  

  82.             if (! workerStarted)  

  83.                 addWorkerFailed(w);  

  84.         }  

  85.         return workerStarted;  

  86.     }  




总结概述:这个方法功能是保证在线程池为运行状态下或者虽然不是运行状态但是强制要求把已经添加到任务队列的线程执行完,执行的过程是创建一个新线程执行


从上方代码看出Worker是执行线程的核心,那么看下这个内部类是怎样的,首先它实现了Runable接口,并且继承了AbstractQueuedSynchronizer类



[java] view plain copy

  1. private final class Worker extends AbstractQueuedSynchronizer implements Runnable  

  2.   

  3.          final Thread thread;  

  4.         /** Initial task to run.  Possibly null. */  

  5.         Runnable firstTask;  

  6.   

  7.         Worker(Runnable firstTask) {  

  8.             setState(-1); // inhibit interrupts until runWorker  

  9.             //指向提交过来的任务  

  10.             this.firstTask = firstTask;  

  11.             //指向自己  

  12.             this.thread = getThreadFactory().newThread(this);  

  13.         }  

  14.         public void run() {  

  15.             runWorker(this);  

  16.         }  

  17.   

  18. }  





没有太多特别的不多解释


run方法调用的是runWorker()方法这个是运行的核心



[java] view plain copy

  1. final void runWorker(Worker w) {  

  2.         Thread wt = Thread.currentThread();//当前线程  

  3.         Runnable task = w.firstTask;//提交上来的任务  

  4.         w.firstTask = null;  

  5.         w.unlock(); // 调用Worker类的tryRelease()方法,将state置为0,   

  6.         //而interruptIfStarted()中只有state>=0才允许调用中断  

  7.         boolean completedAbruptly = true;  

  8.         try {  

  9.             //先执行提交上来的任务,完成后循环从队列中取任务执行  

  10.             while (task != null || (task = getTask()) != null) {  

  11.                 w.lock();//加锁保证调用中断后运行的任务可以正常完成  

  12.                 //执行新任务前要做以下判断  

  13.                 //1如果线程池状态是大于等于stop(调用shutdownnow方法了),  

  14.                 //直接查看当前线程符合未设置中断位 则直接调用wt.interrupt()方法设置  

  15.                 //2如果线程池不是大于等于stop状态,则调用Thread.interrupted()清除interrupt位,  

  16.                 //这时如果程池为大于stop状态(有其他线程调用线程池的stopnow方法),  

  17.                 //再查看当前线程符合未设置中断位,如果没设置调用wt.interrupt()方法设置  

  18.                 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())  

  19.                     //线程池是运行态不会走到这  

  20.                     wt.interrupt();//尝试终止正在执行的任务,这里仅仅设置一个标志位  

  21.                 try {  

  22.                     beforeExecute(wt, task);  

  23.                     Throwable thrown = null;  

  24.                     try {  

  25.                         //直接调用run方法,在当前线程中执行  

  26.                         task.run();  

  27.                     } catch (RuntimeException x) {  

  28.                         thrown = x; throw x;  

  29.                     } catch (Error x) {  

  30.                         thrown = x; throw x;  

  31.                     } catch (Throwable x) {  

  32.                         thrown = x; throw new Error(x);  

  33.                     } finally {  

  34.                         afterExecute(task, thrown);  

  35.                     }  

  36.                 } finally {  

  37.                     task = null;  

  38.                     w.completedTasks++;  

  39.                     w.unlock();  

  40.                 }  

  41.             }  

  42.             completedAbruptly = false;  

  43.         } finally {  

  44.             processWorkerExit(w, completedAbruptly);  

  45.         }  

  46.     }  





总结概述:此方法特别在执行线程直接在当前线程中调用线程队列中的run方法,而没有新建线程,确保了线程的重复利用


线程执行完当前任务会循环读取队列中等待的任务,下边看看如何取队列中的任务


 

[java] view plain copy

  1. private Runnable getTask() {  

  2.         boolean timedOut = false;   

  3.   

  4.         for (;;) {  

  5.             int c = ctl.get();  

  6.             int rs = runStateOf(c);  

  7.             //线程池状态RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3  

  8.             // 如果线程池大于等于SHUTDOWN(调用过shutdown方法),  

  9.             //判断是否是stop(调用shutdownnow)之后的状态或者等待队列已经为空  

  10.             //言外之意调用过shutdownnow将停止执行等待队列中的任务,  

  11.             //还有只掉用过shutdown方法会保证工作队列中的任务会被执行完  

  12.             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  

  13.                 //已经调用shutdown或者等待队列中的任务已经执行完,如果调用shutdownnow队列中的任务还没执行完那就放弃执行  

  14.                 //减少工作线程数  

  15.                 decrementWorkerCount();  

  16.                 return null;  

  17.             }  

  18.   

  19.             int wc = workerCountOf(c);  

  20.   

  21.             // 工作线程数大于核心线程数或者核心线程超时时间为真(默认为false)  

  22.             //allowCoreThreadTimeOut为true超时会关闭线程  

  23.             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  

  24.             //1工作线程数大于最大线程数或【超时关闭标志位真且真的超时了】  

  25.             //2 上个条件成立(言外之意工作线程数大于最大线程数或者已经查过空闲时间没任务,  

  26.             //此时可能需要关闭一个线程了),并且确实有线程在工作(有工作线程才需要关闭),  

  27.             //或者任务队列没工作任务了(没任务了对应的是超时那种情况)  

  28.             //可能情况:1.wc > maximumPoolSize成立,wc > 1成立  

  29.                             //:大于核心线程数,有线程在运行,关闭一个线程  

  30.             //        2.wc > maximumPoolSize成立,workQueue.isEmpty() 成立  

  31.                             //:大于核心线程数,队列中已经没有任务可执行,关闭一个线程  

  32.             //        3.(timed && timedOut)成立,wc > 1 成立  

  33.                             //:线程空闲超时,有线程在运行,关闭一个线程  

  34.             //          4.(timed && timedOut)成立,workQueue.isEmpty()成立  

  35.                             // :线程空闲超时,队列中没有可执行的任务  

  36.             if ((wc > maximumPoolSize || (timed && timedOut))  

  37.                 && (wc > 1 || workQueue.isEmpty())) {  

  38.                 //工作数量减一并返回null 返回null上层方法就会结束当前线程  

  39.                 if (compareAndDecrementWorkerCount(c))  

  40.                     return null;  

  41.                 continue;  

  42.             }  

  43.   

  44.             try {  

  45.                 //如果上述情况不满足则正常取任务执行  

  46.                 Runnable r = timed ?  

  47.                //没有任务会挂起指定时间(言外之意已经大于核心数或者有超时时间的不能永久的阻塞下去)  

  48.                     workQueue.take();//没有任务会阻塞直到有任务来  

  49.                 if (r != null)  

  50.                     return r;  

  51.                 timedOut = true;  

  52.             } catch (InterruptedException retry) {  

  53.                 timedOut = false;  

  54.             }  

  55.         }  

  56.     }  




总结概述:只要线程池没有调用shutDown就尝试取任务消费,已调用shutdown但队列还有任务没执行完,尝试取执行。大于核心线程数或者已经超时队列中没任务可执行,则尝试关闭当前线程。


整体总结:有新任务来到,如果没有达到核心线程数,则启动新线程执行,已经达到核心线程数尝试放到队列,核心线程数和队列都满但核心线程数没有达到最大线程数再建立一个线程执行,如果都满了就拒绝执行。 

执行过程中要重复不断的检查线程池的状态,如果只调用过shutDown,但线程池中还有等待执行的队列则取执行完等待的任务,并拒绝新到的任务(抛出异常),如果调用shutDownNow方法则放弃执行队列中的任务,并尝试终止正则执行的任务。 

如果工作线程数大于核心线程数或者线程空闲时间大于设置时间,那么尝试终止当前线程。如果没有设置超时终止则没有任务执行时线程阻塞。




以上是关于「超详细」Java线程池源码解析的主要内容,如果未能解决你的问题,请参考以下文章

超详细的线程池使用解析

Java Executor源码解析—Executors线程池工厂以及四大内置线程池

Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字

Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字

Java Executor源码解析—ThreadPoolExecutor线程池的介绍和基本属性一万字

Java多线程学习(吐血超详细总结)