Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码相关的知识,希望对你有一定的参考价值。

前面的文章中,我们介绍了ThreadPoolExecutor线程池的基本特性,以及execute和submit方法的源码,现在我们来看看ThreadPoolExecutor线程池的其他方法的源码解析。

系列文章:

  1. Java Executor源码解析(1)—Executor执行框架的概述
  2. Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性【一万字】
  3. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
  4. Java Executor源码解析(4)—ThreadPoolExecutor线程池submit方法以及FutureTask源码【一万字】
  5. Java Executor源码解析(5)—ThreadPoolExecutor线程池其他方法的源码
  6. Java Executor源码解析(6)—ScheduledThreadPoolExecutor调度线程池源码解析【一万字】

文章目录

1 核心线程预启动

在默认情况下创建的线程池不会有任何线程,只有当新任务到达时,才开始创建和启动核心线程,但是我们可以使用 prestartCoreThread() 和 prestartAllCoreThreads() 方法动态调整。

如果使用非空队列构建池,则可能需要预先启动线程,才能在保证线程池活性。

1.1 prestartCoreThread启动一条

public boolean prestartCoreThread()

启动一个核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回 false。

/**
 * 启动一个核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回 false。
 *
 * @return 如果一条线程被启动,那么返回true;否则返回false;
 */
public boolean prestartCoreThread() 
    //如果线程数量小于corePoolSize,那么调用addWorker(null, true)启动一条核心线程,启动成功则返回true
    //如果线程数量大于等于corePoolSize,或者addWorker启动失败,那么返回false
    return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);

1.2 prestartAllCoreThreads启动全部

public int prestartAllCoreThreads()

启动所有核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回,返回通过该方法启动的线程数。

/**
 * 启动所有核心线程,使其处于等待工作的空闲状态。如果已启动所有核心线程,此方法将返回,返回通过该方法启动的线程数。
 *
 * @return 通过该方法启动的线程数
 */
public int prestartAllCoreThreads() 
    //n作为启动的线程计数器
    int n = 0;
    //循环调用addWorker启动核心线程,如果某一次启动失败那么表示核心线程启动完毕或者线程池被关闭等情况
    while (addWorker(null, true))
        //成功之后n自增1
        ++n;
    return n;

2 关闭线程池

2.1 shutdown温和停止

public void shutdown()

关闭线程池。将线程池状态置为SHUTDOWN,正在执行的任务和队列里等待的任务会执行完,停止接收外部提交的新任务,中断所有空闲线程。通过这个方法不能知道所有任务是否都执行完毕!在线程池对象被GC标记的时候,在finalize方法中也会调用该方法!

shutdown执行之后,由于状态变成了SHURDOWN,那么一切的新任务在addWorker方法中将被拒绝,进而触发拒绝策略的执行。队列中的任务以及正在执行的任务将正常执行,由于没有新任务进来,最终所有任务执行完毕!此时线程池中的每一个非等待的线程都将执行processWorkerExit方法将对应的Worker清除,并且workercount线程计数自减,在processWorkerExit中还会调用tryTerminate(),最终至少有一个线程会判断到SHURDOWN状态并且workercount线程数量为0,随后会将线程池状态改为TIDYING—TERMINATED,最终彻底关闭线程池。

假如tryTerminate()方法中没有interruptIdleWorkers(ONLY_ONE)中断空闲线程的逻辑,如果此时有空闲的线程正在等待任务,并且处于阻塞状态,那么该空闲线程将不被通知而无法执行processWorkerExit方法,此时线程池可能无法正常终止。

/**
 * 关闭线程池。将线程池状态置为SHUTDOWN,正在执行的任务和队列里等待的任务会执行完,停止接收外部提交的新任务,中断所有空闲线程。
 * 通过这个方法不能知道所有任务是否都执行完毕!
 *
 * @throws SecurityException 安全管理器不允许中断线程池
 */
public void shutdown() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁,即在关闭线程池期间,其他需要锁的操作不被允许
    //比如访问线程池信息、新增、移除、中断Worker等操作
    mainLock.lock();
    try 
        //安全管理器检测是否右关闭线程池的权限
        checkShutdownAccess();
        //线程池状态转换为SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中断所有空闲线程
        interruptIdleWorkers();
        //ThreadPoolExecutor提供空的实现,主要是其子类ScheduledThreadPoolExecutor调用的钩子方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
     finally 
        //解锁
        mainLock.unlock();
    
    //尝试彻底终止线程池:尝试转换为TIDYING以及TERMINATED状态
    tryTerminate();

2.1.1 advanceRunState切换状态

仅仅被shutdown()和shutdownNow()方法调用,循环的将运行状态转换到指定状态(SHUTDOWN 或者 STOP),除非状态已经大于等于指定状态。

/**
 * 仅仅被shutdown()和shutdownNow()方法调用
 * 循环的将运行状态转换到指定状态,除非状态已经大于等于指定状态
 *
 * @param targetState 指定状态,SHUTDOWN 或者 STOP
 */
private void advanceRunState(int targetState) 
    /*开启一个循环*/
    for (; ; ) 
        //获取此时的ctl值c
        int c = ctl.get();
        //如果运行状态值大于等于指定状态值,那么break退出循环
        //或者 尝试CAS的将ctl的运行状态部分的值转换为指定状态(线程数量部分的值不动)成功,那么break退出循环
        if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
        //如果运行状态值小于指定状态值,或者CAS装换状态失败,那么进行下一次循环重试
    

2.1.2 interruptIdleWorkers中断所有空闲线程

尝试中断所有空闲线程,内部实际上就是调用的interruptIdleWorkers方法,参数传递的是false,而在前面的tryTerminate方法中是调用interruptIdleWorkers(true),表示尝试中断最多一条空闲线程

/**
 * 尝试中断所有空闲线程,内部实际上就是调用的interruptIdleWorkers方法,参数传递的是false
 * 而在前面的tryTerminate方法中是调用interruptIdleWorkers(true),表示尝试中断最多一条空闲线程
 */
private void interruptIdleWorkers() 
    //这个方法在tryTerminate部分已经讲过了
    interruptIdleWorkers(false);

2.2 shutdownNow立即停止

public List< Runnable > shutdownNow()

关闭线程池。将线程池状态置为STOP,正在执行的任务会尽力尝试终止,队列里的任务会移除,停止接收外部提交的新任务,中断所有线程。通过这个方法也不能知道所有任务是否都执行完毕!返回等待执行的任务的列表。

此实现所谓的终止正在执行的任务策略是通过Thread.interrupt()中断线程,很明显,无法响应中断的任何任务都无法被即时终止。这个原理和FutureTask的cancel(true)取消正在执行的任务原理是一样的,都无法提供任何保证!

/**
 * 关闭线程池。将线程池状态置为STOP,正在执行的任务会尽力尝试终止,队列里的任务会移除,停止接收外部提交的新任务,中断所有线程。
 * 通过这个方法也不能知道所有任务是否都执行完毕!返回等待执行的任务的列表。
 *
 * @throws SecurityException 安全管理器不允许中断线程池
 */
public List<Runnable> shutdownNow() 
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁,即在关闭线程池期间,其他需要锁的操作不被允许
    //比如访问线程池信息、新增、移除、中断Worker等操作
    mainLock.lock();
    try 
        //安全管理器检测是否右关闭线程池的权限
        checkShutdownAccess();
        //线程池状态转换为STOP
        advanceRunState(STOP);
        //中断所有线程
        interruptWorkers();
        //移除任务队列中的任务到新集合tasks中
        tasks = drainQueue();
     finally 
        //解锁
        mainLock.unlock();
    
    //尝试彻底终止线程池:尝试转换为TIDYING以及TERMINATED状态
    tryTerminate();
    //返回tasks
    return tasks;

2.2.1 interruptWorkers中断所有运行线程

中断所有空闲和工作状态的线程,但是对于没有runWorker的线程(state=-1)除外。这里就能看出来初始化Worker时state为-1的用处,要求线程必须运行起来才能中断。

/**
 * 中断所有空闲和工作状态的线程,但是对于没有runWorker的线程(state=-1)除外
 */
private void interruptWorkers() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //遍历workers集合
        for (Worker w : workers)
            //如果线程执行了runWorker(state不等于-1),并且没有被中断,那么中断线程。
            w.interruptIfStarted();
     finally 
        mainLock.unlock();
    


/**
 * 如果线程执行了runWorker(state不等于-1),并且没有被中断,那么中断线程。
 */
void interruptIfStarted() 
    Thread t;
    //如果此Worker的state大于等于0,并且thread不为null,并且thread没有被中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) 
        try 
            //那么中断thread
            t.interrupt();
         catch (SecurityException ignore) 
        
    

2.2.2 drainQueue转移全部任务

将任务队列中的任务转移到一个新集合中,仅被shutdownNow()调用。

/**
 * 将任务队列中的任务转移到一个新集合中,仅被shutdownNow()调用
 */
private List<Runnable> drainQueue() 
    BlockingQueue<Runnable> q = workQueue;
    //新建ArrayList集合taskList,用来存放任务
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    //首先使用BlockingQueue集合本身drainTo方法尝试一次性转移所有任务,
    //但是可能由于队列的特性无法转移,比如DelayQueue只能转移超时时间到了的数据
    q.drainTo(taskList);
    //如果q还有元素
    if (!q.isEmpty()) 
        //那么遍历q,一个个的转移
        for (Runnable r : q.toArray(new Runnable[0])) 
            if (q.remove(r))
                taskList.add(r);
        
    
    //返回taskList
    return taskList;

3 hook钩子方法

ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法beforeExecute(Thread,Runnable)和afterExecute(Runnable,Throwable);此外,terminated()方法则会在在Executor变成TIDYING状态后被调用。

这些方法默认都是空实现,我们可以继承ThreadPoolExecutor并重写这些方法来对线程池中线程的运行状态进行了监控,比如监控任务的平均执行时间、最大执行时间和最小执行时间等。或者实现实现其他扩展功能。

钩子方法:

protected void beforeExecute(Thread t, Runnable r)
t - 将运行任务 r 的线程。
r - 将执行的任务。

在runWorker方法中每次执行真正线程任务task.run()方法之前都会执行beforeExecute前置方法。此方法由将执行任务 r 的线程 t 调用。如果前置方法执行过程中抛出异常,那么后面的线程任务将不会执行,并且执行线程会被清理,但是后面的processWorkerExit方法有可能会补充新线程。

protected void afterExecute(Runnable r, Throwable t)
r - 已经完成的任务。
t - 导致任务终止的异常;如果执行正常结束,则为 null。

在runWorker方法中每次线程任务task.run()方法执行完成后或者抛出异常之后都会执行afterExecute后置方法,此方法由执行任务的线程调用。如果后置方法执行过程中抛出异常,那么执行线程会被清理,但是后面的processWorkerExit方法有可能会补充新线程。

protected void terminated()

在线程池变成TIDYING状态之后,即线程池被终止时,就会执行terminated()方法。terminated()方法执行完毕之后线程池就会变成TERMINATED状态。

钩子方法的使用案例:

/**
 * @author lx
 */
public class ThreadPoolHookTest 

    public static void main(String[] args) throws InterruptedException 
        //自定义线程池,这里就直接采用匿名内部类的方法重写这三个方法了
        ExecutorService pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10),
                r -> 
                    System.out.println("线程" + r.hashCode() + "创建");
                    //线程命名
                    return new Thread(r, "threadPool" + r.hashCode());
                , new ThreadPoolExecutor.CallerRunsPolicy()) 

            /**
             * 前置方法
             */
            @Override
            protected void beforeExecute(Thread t, Runnable r) 
                System.out.println("准备执行:" + ((ThreadTask) r).taskName);
                //抛出异常测试
                //int i = 1 / 0;
            

            /**
             * 后置方法
             */
            @Override
            protected void afterExecute(Runnable r, Throwable t) 
                System.out.println("执行完毕:" + ((ThreadTask) r).taskName);
                //抛出异常测试
                //int i = 1 / 0;
            

            /**
             * 终结方法
             */
            @Override
            protected void terminated() 
                System.out.println("----线程池退出----");
                //抛出异常测试
                //int i = 1 / 0;
            
        ;

        for (int i = 0; i < 10; i++) 
            pool.execute(new ThreadTask("Task" + i));
        
        Thread.sleep(1000);
        pool.shutdown();
    

    static class ThreadTask implements Runnable 
        private String taskName;

        ThreadTask(String name) 
            taskName = name;
        

        @Override
        public void run() 
            //输出执行线程的名称
            System.out.println("TaskName" + taskName + "---ThreadName:" + Thread.currentThread().getName());
        
    

4 线程池信息获取

ThreadPoolExecutor的线程池实现提供了许多可以获取线程池核心信息的方法,方便线程池进行实时监控。这些方法基本都需要获取mainLock锁,因此可能会减缓线程池核心功能的效率。

4.1 awaitTermination等待终止

awaitTermination(long timeOut, TimeUnit unit)

指定时间内等待线程池被彻底终止(TERMINATED状态),一般和shutdown连用,用于追踪所有任务是否执行完毕。

如果指定时间范围内线程池被彻底终止(TERMINATED状态),那么返回true;否则返回false。

在进行awaitNanos限时等待时,如果一开始就是中断状态或者如果在signal()或signalAll()方法调用之前就因为中断而被唤醒,那么抛出InterruptedException异常。

/**
 * 指定时间内等待线程池被彻底终止(TERMINATED状态),一般和shutdown连用,用于追踪所有任务是否执行完毕。
 *
 * @param timeout 超时时间
 * @param unit    时间单位
 * @return 如果指定时间范围内线程池被彻底终止(TERMINATED状态),那么返回true;否则返回false。
 * @throws InterruptedException 在进行awaitNanos时,如果一开始就是中断状态
 *                              或者如果在signal()或signalAll()方法调用之前就因为中断而被唤醒,那么抛出异常
 */
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException 
    //计算超时时间纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        /*开启一个死循环,等待状态满足或者超时或者被中断*/
        for (; ; ) 
            //如果线程池状态是TERMINATED状态,那么直接返回true
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //否则,可能需要超时等待
            //如果剩余超时时间纳秒小于等于0,表示超时时间到了,那么直接返回false
            if (nanos <= 0)
                return false;
            //到这一步,表示还可以继续超时等待,那么当前线程在termination条件变量上超时等待nanos纳秒
            nanos = termination.awaitNanos(nanos);
            //被唤醒之后继续下一次循环或者抛出异常
        
     finally 
        //解锁
        mainLock.unlock();
    

4.2 getTaskCount计划任务数量

public long getTaskCount()

返回已计划执行的任务的总数,返回的值只是一个近似值。返回的值为completedTaskCount + 每一个Worker的completedTasks + 正在执行的任务数量 + 任务队列大小。

/**
 * 返回已计划执行的任务的总数。返回的值只是一个近似值。
 * 返回的值为:completedTaskCount + 每一个Worker的completedTasks + 任务队列大小
 *
 * @return completedTaskCount + 每一个Worker的completedTasks + 任务队列大小
 */
public long getTaskCount() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //初始化计数器n,初始值为completedTaskCount
        long n = completedTaskCount;
        //遍历workers集合
        for (Worker w : workers) 
            //加上每一个Worker内部的执行任务的计数
            n += w.completedTasks;
            //如果该Worker被锁定了,说明正在执行任务,那么再自增1
            if (w.isLocked())
                ++n;
        
        //最后的返回值还要加上任务队列中计划执行的任务数量
        return n + workQueue.size();
     finally 
        //解锁
        mainLock.unlock();
    

4.3 getActiveCount工作线程数量

public int getActiveCount()

返回正在执行任务的线程的近似数量。统计方式很简单,就是遍历workers队列,如果有Worker被锁定了,那么一定在执行任务。

/**
 * @return 返回正在执行任务的线程的近似数量
 */
public int getActiveCount() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //初始化计数器
        int n = 0;
        //遍历workers集合
        for (Worker w : workers)
            //如果该Worker被锁定了,说明正在执行任务,那么n自增1
            if (w.isLocked())
                ++n;
        //返回n
        return n;
     finally 
        //解锁
        mainLock.unlock();
    

4.4 getPoolSize当前线程数

public int getPoolSize()

返回池中的当前线程数。如果线程池状态大于等于TIDYING,表示线程池被彻底终止了,那么直接返回0,否则返回workers集合的数量即可。

/**
 * @return 返回池中的当前线程数
 */
public int getPoolSize() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //如果运行状态大于等于TIDYING,即被终止状态,那么返回0
        //否则返回workers集合的数量
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
     finally 
        //解锁
        mainLock.unlock();
    

4.5 getLargestPoolSize曾经最大线程数

public int getLargestPoolSize()

返回曾经同时位于池中的最大线程数。很简单,就是返回largestPoolSize属性。

/**
 * @return 返回曾经同时位于池中的最大线程数
 */
public int getLargestPoolSize() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //返回largestPoolSize
        return largestPoolSize;
     finally 
        //解锁
        mainLock.unlock();
    

4.6 getCompletedTaskCount已完成任务数

public long getCompletedTaskCount()

返回已完成执行的近似任务总数,该值在整个连续调用过程中不会减少。

由于completedTaskCount仅在一个Worker被清理时才会更新,因此completedTaskCount并不准确,还需要统计每一个Worker中的completedTasks。

/**
 * @return 返回已完成执行的近似任务总数
 */
public long getCompletedTaskCount() 
    final ReentrantLock mainLock = this.mainLock;
    //获取mainLock锁
    mainLock.lock();
    try 
        //初始化计数器n,初始值为completedTaskCount,由于completedTaskCount仅在一个Worker被清理时才会更新以上是关于Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码的主要内容,如果未能解决你的问题,请参考以下文章

Java Executor源码解析—Executors线程池工厂以及四大内置线程池

Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字

Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字

Java Executor源码解析—ThreadPoolExecutor线程池的介绍和基本属性一万字

mybatis源码解析9---执行器Executor解析

JUC—Executor线程池框架源码深度解析六万字