Java中的线程池——ThreadPoolExecutor源代码分析

Posted WSYW126

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的线程池——ThreadPoolExecutor源代码分析相关的知识,希望对你有一定的参考价值。

线程池ThreadPoolExecutor的使用说明和变量的定义

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 使用一个ctl同时维护线程池的状态和线程数量,不仅仅是为了通过位运算提高效率,能够有效避免两者不一致的情况,如果2个地方存储,可能需要锁去保证一致性。(因为线程池源码同时操作线程池状态和线程数量的地方挺多的)(volatile)

    private static final int COUNT_BITS = Integer.SIZE - 3; //32-3=29

    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //  00028个11



// runState is stored in the high-order bits 把运行状态保存在最高的三位

    private static final int RUNNING    = -1 << COUNT_BITS; //11128个00

    private static final int SHUTDOWN   =  0 << COUNT_BITS; //00028个00

    private static final int STOP       =  1 << COUNT_BITS; //00128个00

    private static final int TIDYING    =  2 << COUNT_BITS; //01028个00

    private static final int TERMINATED =  3 << COUNT_BITS; //01128个00



// Packing and unpacking ctl

    private static int runStateOf(int c)      return c & ~CAPACITY;  //~CAPACITY = 11128个00 ,位运算取最高三位

    private static int workerCountOf(int c)   return c & CAPACITY;  //位运算取除去最高三位后的29位。

    private static int ctlOf(int rs, int wc)  return rs | wc;  //或运算,拼装ctl



/*

 * Bit field accessors that don't require unpacking ctl.

 * These depend on the bit layout and on workerCount being never negative.

 */



    private static boolean runStateLessThan(int c, int s) 

    return c < s; //比较运行状态

    



    private static boolean runStateAtLeast(int c, int s) 

    return c >= s; //比较运行状态

    



    private static boolean isRunning(int c) 

    return c < SHUTDOWN; //不能用等于,因为只有最高三位是状态位。

    



    /**

      * Attempts to CAS-increment the workerCount field of ctl.

      */

    private boolean compareAndIncrementWorkerCount(int expect) 

    return ctl.compareAndSet(expect, expect + 1); //CAS

    



    /**

      * Attempts to CAS-decrement the workerCount field of ctl.

      */

    private boolean compareAndDecrementWorkerCount(int expect) 

    return ctl.compareAndSet(expect, expect - 1); //CAS

    



    /**

      * Decrements the workerCount field of ctl. This is called only on

      * abrupt termination of a thread (see processWorkerExit). Other

      * decrements are performed within getTask.

      */

    private void decrementWorkerCount() 

    do  while (! compareAndDecrementWorkerCount(ctl.get())); //自旋等待

    

/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue; //任务队列用于放提交到线程池的任务,并有任务线程处理。

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     */
    private final ReentrantLock mainLock = new ReentrantLock(); //当需要访问任务线程集合和相关的记录需要,加锁。

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>(); //线程池任务线程集,当持有mainLock锁时,可以访问线程池任务线程集

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition(); //等待线程池结束条件 

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize; //在持有mainLock的情况下,追踪最大线程池 

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount; //在持有mainLock的情况下,可以访问,completedTaskCount为完成任务计数器,在任务线程结束时更新。 

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     */

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread.start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory; //创建任务线程的工厂

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler; //当线程池饱和或线程池关闭时,拒绝任务处理handler

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime; //线程池空闲任务线程,等待任务的时间。如果当前线程数量大于核心线程池数量,且allowCoreThreadTimeOut为true,任务线程空闲,允许等待keepAliveTime时间,以便在这个时间范围内,有任务需要执行。

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut; //在当前线程数量大于核心线程池数量的情况下,是否允许空闲任务线程等,保活keepAliveTime时间,等待任务的到来。 

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize; //在不允许空闲等待的情况,核心线程池数量,即保活的任务线程最小数量。如果允许空闲等待,线程池任务线程可能为0。 

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize; // 最大线程池数量,如果容量是有界的,实际为CAPACITY 

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy(); //默认的拒绝任务策略,抛出运行时异常

小结

 
ThreadPoolExecutor的变量主要有:
核心线程池数量corePoolSize。
最大线程池数量maximumPoolSize。
allowCoreThreadTimeOut 允许空闲任务线程。
保活keepAliveTime时间,等待新任务的到来。
线程工厂ThreadFactory用于创建任务线程。
拒绝任务处理器RejectedExecutionHandler,默认的拒绝任务策略为AbortPolicy,抛出运行时异常。还有:直接丢弃策略DiscardPolicy、丢弃旧的任务DiscardOldestPolicy、调用者执行任务策略CallerRunsPolicy。
上面的变量为volatile,保证可见性,以便线程池执行操作时,可以使用最新的变量。
阻塞的任务队列final BlockingQueue<Runnable> workQueue。
AtomicInteger的ctl用于包装线程状态runState和任务线程数workerCount。
任务线程集final HashSet<Worker> workers。
largestPoolSize记录线程池的最大任务线程数。
completedTaskCount为完成任务计数器,在任务线程结束时更新。
可重入锁mainLock,用于保护非线程安全的变量如workers,largestPoolSize,completedTaskCount。
等待线程池结束条件termination,用于控制超时等待线程池关闭。

 

CTL

 使用一个ctl同时维护线程池的状态和线程数量,不仅仅是为了通过位运算提高效率,能够有效避免两者不一致的情况,如果2个地方存储,可能需要锁去保证一致性。(因为线程池源码同时操作线程池状态和线程数量的地方挺多的)

 

状态解释

  • RUNNING是运行状态,线程池可以接收新任务
  • SHUTDOWN是在调用shutdown()方法以后处在的状态。表示不再接收新任务,但队列中的任务可以执行完毕
  • STOP是在调用shutdownNow()方法以后的状态。不再接收新任务,中断正在执行的任务,抛弃队列中的任务
  • TIDYING表示所有任务都执行完毕
  • TERMINATED为中止状态,调用terminated()方法后,尝试更新为此状态

 

状态变迁过程

* RUNNING -> SHUTDOWN

*    On invocation of shutdown(), perhaps implicitly in finalize()

* (RUNNING or SHUTDOWN) -> STOP

*    On invocation of shutdownNow()

* SHUTDOWN -> TIDYING

*    When both queue and pool are empty

* STOP -> TIDYING

*    When pool is empty

* TIDYING -> TERMINATED

*    When the terminated() hook method has completed

 

线程池执行提交任务和执行任务

执行任务的方法

public void execute(Runnable command) 

    if (command == null)

        throw new NullPointerException();

    /*

     * Proceed in 3 steps:

     *

     * 1. If fewer than corePoolSize threads are running, try to

     * start a new thread with the given command as its first

     * task.  The call to addWorker atomically checks runState and

     * workerCount, and so prevents false alarms that would add

     * threads when it shouldn't, by returning false.

     *

 1.如果工作线程小于核心线程池数量,尝试新建一个工作线程执行任务addWorker。addWorker将会自动检查线程池状态和工作线程数,以防在添加工作线程的过程中,线程池被关闭。 



     * 2. If a task can be successfully queued, then we still need

     * to double-check whether we should have added a thread

     * (because existing ones died since last checking) or that

     * the pool shut down since entry into this method. So we

     * recheck state and if necessary roll back the enqueuing if

     * stopped, or start a new thread if there are none.

2.如果创建工作线程执行任务失败,则任务入队列,如果入队列成功,我们仍需要二次检查线程池状态,以防在入队列的过程中,线程池关闭。如果线程池关闭,则回滚任务。 

     *

     * 3. If we cannot queue task, then we try to add a new

     * thread.  If it fails, we know we are shut down or saturated

     * and so reject the task.

     */

        如果任务入队列失败,则尝试创建一个工作线程执行任务




    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) 

        if (addWorker(command, true))

            return;

        c = ctl.get();

    

    if (isRunning(c) && workQueue.offer(command)) 

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    

    else if (!addWorker(command, false))

        reject(command);

    

  • 判断command是否为空
  • 计算线程池中的线程数量,如果数量小于corePoolSize,就创建一个新线程执行任务
  • 如果线程池正在运行状态,且写入队列成功。
    • 再次获取线程池状态判断,如果线程状态变成了非运行状态,就从队列中移除任务,调用reject()方法执行饱和策略handler
    • 如果线程池为空,就创建一个新线程执行任务。【在合适的时机,队列中的任务会被调度】
  • 如果第3步判断没有通过,尝试建立线程执行任务,若没有成功,就执行饱和策略handler

 

添加线程池worker

private boolean addWorker(Runnable firstTask, boolean core) 

    retry:

    for (;;) 

        int c = ctl.get(); //获取ctl

        int rs = runStateOf(c);//获取运行状态



        // Check if queue empty only if necessary.  //判断运行状态和是否为core size=0,添加一个线程。

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

               firstTask == null &&

               ! workQueue.isEmpty()))

            return false;



        for (;;) 

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        

    



    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try 

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) 

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try 

                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if

                // shut down before lock acquired.

                int rs = runStateOf(ctl.get());



                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) 

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                

             finally 

                mainLock.unlock();

            

            if (workerAdded) 

                t.start();

                workerStarted = true;

            

        

     finally 

        if (! workerStarted)

            addWorkerFailed(w);

    

    return workerStarted;

 

  1. 获取当前线程池的状态,如果是STOP,TIDYING,TERMINATED状态的话,则会返回false。如果现在状态是SHUTDOWN,但是firstTask不为空或者workQueue为空的话,那么直接返回false。【防止core size为0,添加一个线程】
  2. 通过自旋的方式,来添加一个ctl计数。
    1. 判断要添加的Worker是否是corePool,如果是的话,那么判断当前的workerCount是否大于corePoolsize,否则判断是否大于maximumPoolSize。
    2. 如果workerCount超出了线程池大小,直接返回false。
    3. 如果小于的话,那么判断是否成功将WorkerCount通过CAS操作增加1,如果增加成功的话。则break retry,进行到第3步。
    4. 否则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断。
  3. 如果满足了的话,那么则创建一个新的Worker对象,然后获取线程池的重入锁后,判断当前线程池的状态,如果当前线程池状态为STOP,TIDYING,TERMINATED的话,那么调用decrementWorkerCount将workerCount减一,然后调用tryTerminate停止线程池,并且返回false。
  4. 如果状态满足的话,那么则在workers中将新创建的worker添加,并且重新计算largestPoolSize,然后启动Worker中的线程开始执行任务。
  5. 在finally中检查workerStarted,如果为false则走添加worker失败的逻辑。
    1. 加锁,判空。从工作线程集移除工作线程 ,然后工作线程数减-1。
    2. 检查是否线程池关闭,关闭则执行相关工作。

 

添加线程池worker失败

private void addWorkerFailed(Worker w) 

    final ReentrantLock mainLock = this.mainLock;

//加锁

    mainLock.lock();

    try 

//判空

        if (w != null)

//从工作线程集移除工作线程

            workers.remove(w);

//然后工作线程数减-1

        decrementWorkerCount();

//检查是否线程池关闭

        tryTerminate();

     finally 

        mainLock.unlock();

    





final void tryTerminate() 

 //自旋尝试关闭线程池 

    for (;;) 

        int c = ctl.get();

//如果线程池正在运行,或正在关闭且队列不为空,则返回

        if (isRunning(c) ||

            runStateAtLeast(c, TIDYING) ||

            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

            return;

//如果工作线程不为空,则中断空闲工作线程

        if (workerCountOf(c) != 0)  // Eligible to terminate

            interruptIdleWorkers(ONLY_ONE);

            return;

        



        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try 

//线程池已关闭,任务队列为空,工作线程为0,更新线程池状态为TIDYING

            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) 

                try 

  //执行结束工作  

                    terminated();

                 finally 

//线程池已结束  

                    ctl.set(ctlOf(TERMINATED, 0));

//唤醒等待线程池结束的线程 

                    termination.signalAll();

                

                return;

            

         finally 

            mainLock.unlock();

        

        // else retry on failed CAS

    







//如果onlyOne为true,只中断最多一个空闲工作线程

private void interruptIdleWorkers(boolean onlyOne) 

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try 

//遍历工作线程集 

            for (Worker w : workers) 

                Thread t = w.thread;

                if (!t.isInterrupted() && w.tryLock()) //锁打开说明,工作线程空闲

                    try 

//如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程 

                        t.interrupt();

                     catch (SecurityException ignore) 

                     finally 

                        w.unlock();

                    

                

//如果是只中断一个空闲线程,则结束本次中断空闲线程任务

                if (onlyOne)

                    break;

            

         finally 

            mainLock.unlock();

        

    

 

 

线程池执行任务

 

调用时机:addWorker里面会调用work.start,work.start即work.run被调用,runWorker被work.run调用,所以worker开始工作了。

final void runWorker(Worker w) 

    Thread wt = Thread.currentThread(); //当前线程

    Runnable task = w.firstTask; //工作线程任务 

    w.firstTask = null;

 //任务线程的锁状态默认为-1,此时解锁+1,变为0,即锁打开状态,允许中断,在任务未执行之前,不允许中断。  

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try 

//如果任务不为null,即创建工作线程成功,并执行任务,如果为null(即在线程池执行任务的时候,创建工作线程失败,任务入队列),从任务队列取一个任务。

        while (task != null || (task = getTask()) != null) 

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted.  This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interrupt

//如果线程池正在Stop,则确保线程中断;  

//如果非处于Stop之后的状态,则判断是否中断,如果中断则判断线程池是否已关闭

//如果线程池正在关闭,但没有中断,则中断线程池  



            if ((runStateAtLeast(ctl.get(), STOP) ||

                 (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try 

                beforeExecute(wt, task);

                Throwable thrown = null;

                try 

//执行任务

                    task.run();

                 catch (RuntimeException x) 

                    thrown = x; throw x;

                 catch (Error x) 

                    thrown = x; throw x;

                 catch (Throwable x) 

                    thrown = x; throw new Error(x);

                 finally 

                    afterExecute(task, thrown);

                

             finally 

//任务线程完成任务数量加1,释放锁 

                task = null;

                w.completedTasks++;

                w.unlock();

            

        

//任务已执行完不可以中断

        completedAbruptly = false;

     finally 

        processWorkerExit(w, completedAbruptly);

    

 

有两个关键点:取队列的任务逻辑、所有任务都执行完毕的处理逻辑。

取队列的任务逻辑

private Runnable getTask() 

    boolean timedOut = false; // Did the last poll() time out?



    for (;;) 

        int c = ctl.get();

        int rs = runStateOf(c);



        // Check if queue empty only if necessary.

//如果线程池处于STOP状态,或者SHUTDOWN且等待队列为空,则工作线程数-1

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) 

            decrementWorkerCount();

            return null;

        



        int wc = workerCountOf(c);



        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;



//同时满足下面条件,则工作线程数-1

//1. 线程数大于最大线程数 或者 有时间限制且超时。

//2. 线程数大于1 或者 等待队列为空

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) 

            if (compareAndDecrementWorkerCount(c))

                return null;

//减少工作线程数量失败

            continue;

        



        try 

 //如果非超时则直接take,否则等待keepAliveTime时间,poll任务

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

         catch (InterruptedException retry) 

            timedOut = false;

        

    

 

所有任务都执行完毕的处理逻辑

private void processWorkerExit(Worker w, boolean completedAbruptly) 

//如果是被突然中断的,需要对线程数-1。正常结束的,都已经在gettask中-1了。

    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

        decrementWorkerCount();



    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try 

//汇总各个worker完成的任务

        completedTaskCount += w.completedTasks;

//从工作线程集移除工作线程

        workers.remove(w);

     finally 

        mainLock.unlock();

    



   //检查是否线程池关闭

    tryTerminate();



    int c = ctl.get();

    if (runStateLessThan(c, STOP)) 

        if (!completedAbruptly) 

//设置最小线程数

            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

//如果等待队列不为空,则min设置为1

            if (min == 0 && ! workQueue.isEmpty())

                min = 1;

//检查现有线程数是否大于需要的线程数。

            if (workerCountOf(c) >= min)

                return; // replacement not needed

        

//添加线程

        addWorker(null, false);

    

 

小结

这章节所有的函数调用关系,如下图:

函数调用关系,也可以当做思维导图来记忆~

 

线程池关闭

shutdown

/** 
     * Initiates an orderly shutdown in which previously submitted 
     * tasks are executed, but no new tasks will be accepted. 
     * Invocation has no additional effect if already shut down. 
     * 
     先前提交的任务将会被工作线程执行,新的线程将会被拒绝。这个方法 
     不会等待提交的任务执行完,我们可以用awaitTermination来等待任务执行完。 
     * <p>This method does not wait for previously submitted tasks to 
     * complete execution.  Use @link #awaitTermination awaitTermination 
     * to do that. 
     * 
     * @throws SecurityException @inheritDoc 
     */  
    public void shutdown()   
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try   
        //检查线程访问权限  
            checkShutdownAccess();  
        //更新线程池状态为SHUTDOWN  
            advanceRunState(SHUTDOWN);  
        //中断空闲工作线程  
            interruptIdleWorkers();  
        //线程池关闭hook  
            onShutdown(); // hook for ScheduledThreadPoolExecutor  
         finally   
            mainLock.unlock();  
          
        tryTerminate();  
      




private void checkShutdownAccess()   
       SecurityManager security = System.getSecurityManager();  
       if (security != null)   
           security.checkPermission(shutdownPerm);  
           final ReentrantLock mainLock = this.mainLock;  
           mainLock.lock();  
           try   
               for (Worker w : workers)  
        //遍历工作线程集,检查任务线程访问权限  
                   security.checkAccess(w.thread);  
            finally   
               mainLock.unlock();  
             
         
   



private void advanceRunState(int targetState)   
       for (;;)   
           int c = ctl.get();  
           if (runStateAtLeast(c, targetState) ||  
               ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))  
               break;  
         
     


private void interruptIdleWorkers()   
        interruptIdleWorkers(false);  
      

private void interruptIdleWorkers(boolean onlyOne)   
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try   
            for (Worker w : workers)   
            //遍历工作线程集  
                Thread t = w.thread;  
                if (!t.isInterrupted() && w.tryLock()) //锁打开说明,工作线程空闲  
                    try   
                //如果工作线程非中断,且空闲,尝试获取锁,获取锁成功,则中断工作线程  
                        t.interrupt();  
                     catch (SecurityException ignore)   
                     finally   
                        w.unlock();  
                      
                  
                if (onlyOne)  
            //如果是只中断一个空闲线程,则结束本次中断空闲线程任务  
                    break;  
              
         finally   
            mainLock.unlock();  
          
      

shutdownNow

/** 
     * Attempts to stop all actively executing tasks, halts the 
     * processing of waiting tasks, and returns a list of the tasks 
     * that were awaiting execution. These tasks are drained (removed) 
     * from the task queue upon return from this method. 
     * 
     尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空, 
     并返回任务队列中的任务集。 
     这个方法不会等待已执行的任务结束,我们用awaitTermination来等待任务执行完 
     * <p>This method does not wait for actively executing tasks to 
     * terminate.  Use @link #awaitTermination awaitTermination to 
     * do that. 
     * 
     * <p>There are no guarantees beyond best-effort attempts to stop 
     * processing actively executing tasks.  This implementation 
     * cancels tasks via @link Thread#interrupt, so any task that 
     * fails to respond to interrupts may never terminate. 
     * 
     * @throws SecurityException @inheritDoc 
     */  
    public List<Runnable> shutdownNow()   
        List<Runnable> tasks;  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try   
        //检查工作线程权限  
            checkShutdownAccess();  
        //更新线程池状态为STOP  
            advanceRunState(STOP);  
        //中断空闲工作线程  
            interruptWorkers();  
            //清空任务队列,并放到tasks集合中  
            tasks = drainQueue();  
         finally   
            mainLock.unlock();  
          
    //尝试结束线程池  
        tryTerminate();  
        return tasks;  
      


/** 
     * Drains the task queue into a new list, normally using 
     * drainTo. But if the queue is a DelayQueue or any other kind of 
     * queue for which poll or drainTo may fail to remove some 
     * elements, it deletes them one by one. 
     */  
    private List<Runnable> drainQueue()   
        //这个方法很简单,不再说了  
        BlockingQueue<Runnable> q = workQueue;  
        List<Runnable> taskList = new ArrayList<Runnable>();  
        q.drainTo(taskList);  
        if (!q.isEmpty())   
            for (Runnable r : q.toArray(new Runnable[0]))   
                if (q.remove(r))  
                    taskList.add(r);  
              
          
        return taskList;  
      

 

立即关闭线程与关闭线程池的不同是,对于关闭线程池,先前提交的任务将会被工作线程执行,新的线程将会被拒绝;而立即关闭线程,尝试停止正在执行的任务,停止等待任务线程的处理,任务队列将会被排空,并返回任务队列中的任务集。这两个方法都不会等待任务执行完或任务结束。

 

awaitTermination

当前线程阻塞,直到

1. 等所有已提交的任务(包括正在跑的和队列中等待的)执行完
2. 或者等超时时间到
3. 或者线程被中断,抛出InterruptedException

然后返回true(shutdown请求后所有任务执行完毕)或false(已超时)

public boolean awaitTermination(long timeout, TimeUnit unit)  
        throws InterruptedException   
        long nanos = unit.toNanos(timeout);  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try   
        //自旋等待线程线程结束条件  
            for (;;)   
                if (runStateAtLeast(ctl.get(), TERMINATED))  
                    return true;  
                if (nanos <= 0)  
                    return false;  
                nanos = termination.awaitNanos(nanos);  
              
         finally   
            mainLock.unlock();  
          
      

小结

shutdownNow()能立即停止线程池,正在跑的和正在等待的任务都停下了。这样做立即生效,但是风险也比较大;
shutdown()只是关闭了提交通道,用submit()是无效的;而内部该怎么跑还是怎么跑,跑完再停。
shutdown()后,不能再提交新的任务进去;但是awaitTermination()后,可以继续提交。
awaitTermination()是阻塞的,返回结果是线程池是否已停止(true/false);shutdown()不阻塞。

 

 

参考资料:
网络资料
备注:
转载请注明出处:https://blog.csdn.net/WSYW126/article/details/105206243
作者:WSYW126

以上是关于Java中的线程池——ThreadPoolExecutor源代码分析的主要内容,如果未能解决你的问题,请参考以下文章

为什么阿里巴巴要禁用Executors创建线程池?

Android中的线程池

浅理解java中的线程池

浅理解java中的线程池

浅理解java中的线程池

java笔记java中的线程池ThreadPoolExecutor的原理和使用