Java 1.6 ThreadPoolExecutor源码解析

Posted Mr-yuenkin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 1.6 ThreadPoolExecutor源码解析相关的知识,希望对你有一定的参考价值。

    由于本人水平与表达能力有限,有错误的地方欢迎交流与指正。

一、 成员变量介绍

public class ThreadPoolExecutor extends AbstractExecutorService 

// 线程状态变迁图:
// RUNNING->SHUTDOWN,通过调用shutdown方法
// RUNNING/SHUTDOWN->STOP,调用shutdwonNow方法
// SHUTDOWN->TERMINATED,当workQueue为空且poolSize=0
// STOP->TERMINATED,当poolSize=0
volatile int runState;
// RUNNING状态能接受新任务且能处理队列里的任务
static final int RUNNING    = 0;

// 不能接受新任务,但是能处理队列里的任务
static final int SHUTDOWN   = 1;

// 不能接受新任务,不能处理队列里的任务
static final int STOP       = 2;

// 和STOP一样,且线程池里所有线程都停止了,已没有可用线程了
    static final int TERMINATED = 3;

    // 任务队列,当poolSize==corePoolSize时新任务会优先放入这个队列里
    private final BlockingQueue<Runnable> workQueue;

    /**
     * 一个全局的锁用来保护poolSize, corePoolSize,
     * maximumPoolSize, runState, and workers set等变量的修改。
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * awaitTermination方法里会用到这个条件
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * 用来执行用户提交的任务,可以理解为线程池中的线程
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * worker线程从workQueue里poll任务的超时时间;
     * 如果当前线程个数>corePoolSize,且allowCoreThreadTimeOut=true线程池会动态收缩线程的数目,注意:单位是纳秒
*/
    private volatile long  keepAliveTime;

    /**
     * 是否允许线程池动态收缩
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * 当池中线程数小于这个值提交新任务时才可能会new一个thread
     */
    private volatile int   corePoolSize;

    /**
     * 线程池允许的最大线程数目
     */
    private volatile int   maximumPoolSize;

    /**
     * 线程池当前的线程数目
     */
    private volatile int   poolSize;


二、 shutdown函数

public void shutdown() 
    SecurityManager security = System.getSecurityManager();
    if (security != null)
        security.checkPermission(shutdownPerm);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        if (security != null)  // Check if caller can modify our threads
            for (Worker w : workers)
                security.checkAccess(w.thread);
        
        int state = runState;
        if (state < SHUTDOWN)
// 一、runState设置成SHUTDOWN
            runState = SHUTDOWN; 

        try 
            for (Worker w : workers) 
               // 二、中断所有空闲的workder线程
                w.interruptIfIdle(); 
            
         catch (SecurityException se)  // Try to back out
            runState = state;
            // tryTerminate() here would be a no-op
            throw se;
        
// 三、尝试terminate线程池(如果满足条件)
        tryTerminate(); // Terminate now if pool and queue empty
     finally 
        mainLock.unlock();
    
void interruptIfIdle() 
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) 
        try 
		    if (thread != Thread.currentThread())
			thread.interrupt();
         finally 
            runLock.unlock();
          
    
 private void tryTerminate() 
      if (poolSize == 0) 
          int state = runState;
          if (state < STOP && !workQueue.isEmpty()) 
              state = RUNNING; // disable termination check below
              Thread t = addThread(null);
              if (t != null)
                  t.start();
          
          if (state == STOP || state == SHUTDOWN) 
              runState = TERMINATED;
              termination.signalAll();
              terminated();
          
      
  
shutdown函数主要做了三个操作:
1、将线程池的状态设置成SHUTDOWN;
2、中断所有空闲的worker线程;
所谓的中断就是调用worker线程的interrupt()方法。如何判断一个worker线程是否空闲?答案就是worker的成员变量runLock,如果interruptIfIdle函数中runLock.tryLock成功,就能说明worker线程一定是空闲的,因为worker的runTask方法在执行任务前也是要获取runLock锁的,执行完毕会释放。
3、尝试终止线程池(把runState状态改成TERMINATED)。
  如果当前线程池线程数目为0,两种情况需要考虑:
  a、线程池没有被STOP且workQueue不空,这种情况是需要处理workQueue里排队的任务的,因此要new一个thread来处理还在排队的任务;
  b、线程池被STOP或SHUTDOWN且workQueue里没有排队的任务了,此时直接将状态改成TERMINATED,且通知调用awaitTermination方法的线程(线程池TERMINATED了)

三、 shutdownNow函数

public List<Runnable> shutdownNow() 
        /*
         * shutdownNow differs from shutdown only in that
         * 1. runState is set to STOP,
         * 2. all worker threads are interrupted, not just the idle ones, and
         * 3. the queue is drained and returned.
         */
	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            if (security != null)  // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            

            int state = runState;
            if (state < STOP)
                runState = STOP;  // 一、状态改成STOP,和shutdown不同

            try 
                for (Worker w : workers) 
                    w.interruptNow(); // 二、中断所有的worker,不管是不是空闲的
                
             catch (SecurityException se)  // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            
            // 三、返回workQueue里排队的任务
            List<Runnable> tasks = drainQueue();            // 四、尝试关闭线程池
            tryTerminate(); // Terminate now if pool and queue empty
            return tasks;
         finally 
            mainLock.unlock();
        
    
void interruptNow() 
 thread.interrupt();
 
shutdownNow函数结构和shutdown结构类似,有些区别:
1、把线程池的状态设置成STOP;
2、调用worker的interruptNow方法,即中断所有worker线程,不管是不是空闲的;
3、这个函数会返回当前workQueue里排队的任务列表,是带返回值的,这些队列里的任务会被删除永远不会被执行了。
小提醒:thread的interrupt方法仅仅是线程的中断标示,某些阻塞方法如thread.sleep join wait及某些IO操作会响应中断并抛出InterruptedException,如果自定义的task是个计算密集的操作或其他不响应中断的操作,shutdownNow调用后还是要等线程执行结束后才能变成TERMINATED状态,要是你的任务是个死循环且不会抛出InterruptedException,那线程池就永远关不掉了。

四、 execute函数

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) 
            if (runState == RUNNING && workQueue.offer(command)) 
		// 防止并发情况,比如另一个线程在本线程调用玩workQueue.offer后就执行了shutdonwNow方法
		// 这时需要把刚提交的任务从workQueue中remove掉
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        
    
private boolean addIfUnderCorePoolSize(Runnable firstTask) 
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            // 如果当前池中线程数量小于corePoolSize且池是RUNNING的,则new一个thread立即执行提交的任务
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
         finally 
            mainLock.unlock();
        
        if (t == null)
            return false;
        t.start();
        return true;
    
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) 
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            // 线程会优先入workQueue的,如果入队失败就会执行到这里;
            // 如果池中线程数量小于最大值且线程是RUNNING的,则立马new个thread立即执行刚提交的任务 
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
         finally 
            mainLock.unlock();
        
        if (t == null)
            return false;
        t.start();
        return true;
    
private void ensureQueuedTaskHandled(Runnable command) 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean reject = false;
        Thread t = null;
        try 
            int state = runState;
            // 感觉这里条件state != RUNNING是有问题的,因为SHUTDOWN状态是可以处理队列里的任务的,不能remove掉;应该改成state>=STOP
            if (state != RUNNING && workQueue.remove(command))
                reject = true;
            // 如果队列不空则需要保留一些线程来处理队列里的任务
            else if (state < STOP &&
                     poolSize < Math.max(corePoolSize, 1) &&
                     !workQueue.isEmpty())
                t = addThread(null);
         finally 
            mainLock.unlock();
        
        if (reject)
            reject(command);
        else if (t != null)
            t.start();
    
/**
* 如果不指定该参数,则默认为DefaultThreadFactory类,线程名字格式为pool-poolNumber-thread-theadNumber,每创建一个线程池poolNumber就会增1,每创建一个线程,该池的threadNumber就会增1。
* 线程是非后台线程,优先级为NORM_PRIORITY(5)
*/
private Thread addThread(Runnable firstTask) 
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) 
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        
        return t;
    
execute方法是执行提交的任务(不一定立马执行,可能是先放在队列里、也有可能直接拒绝了),分两种情况:
情况1:poolSize<corePoolSize,调用addIfUnderCorePoolSize方法新增一个worker
情况2:poolSize>=corePoolSize,
①尝试调用workQueue.offer方法,如果入队列失败则到步骤②;
②addIfUnderMaximumPoolSize方法扩大线程池中可用线程个数目,如果这步也失败了则到③;
③执行reject方法

五、 内部类Worker

private final class Worker implements Runnable 
        // 每次运行task都要获取这个锁防止interrupt方法把正在运行的任务也中断了
        private final ReentrantLock runLock = new ReentrantLock();

        private Runnable firstTask;
        // 用来累计这个worker已经完成了多少个任务了,worker退出时可能需要把这个值累计到线程池已经完成的任务上
        volatile long completedTasks;

        // 运行这个worker的线程
        Thread thread;

主循环:

public void run() 
            try 
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) 
                    runTask(task);
                    task = null;
                
             finally 
                workerDone(this);
            
        
Runnable getTask() 
        for (;;) 
            try 
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                // 如果仅仅是SHUTDOWN,队列里的任务还是要执行的
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                // 如果当前线程数大于corePoolSize或者允许动态收缩,则采用超时获取的方式(因为如果workQueue一直为空线程会一直挂住,就不能动态收缩了)
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    // 否则,无限等待任务
                    r = workQueue.take();
                if (r != null)
                    return r;
                // 当前worker线程是否可以退出
                if (workerCanExit()) 
                    // 如果此时线程池已经SHUTDOWN了,那么需要向其他空闲线程
                    // 发个中断信号不要一直阻塞在workQueue.take或poll方法中,线程可能需要被回收掉
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                
                // Else retry
             catch (InterruptedException ie) 
                // 被中断了,继续循环检查线程池的状态
            
        
    
// 当workQueue中没有任务时会调用这个方法
// worker退出的条件:①线程池STOP了;②线程池虽然是RUNNING或者SHUTDOWN,但是任务队列空了;③允许动态收缩且当前线程数大于corePoolSize

private boolean workerCanExit() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        boolean canExit;
        try 
            canExit = runState >= STOP ||
                workQueue.isEmpty() ||
                (allowCoreThreadTimeOut &&
                 poolSize > Math.max(1, corePoolSize));
         finally 
            mainLock.unlock();
        
        return canExit;
    
private void runTask(Runnable task) 
            final ReentrantLock runLock = this.runLock;
            // 加个锁表明该worker是busy的
            runLock.lock();
            try 
                /*
                 * 除非线程池状态是STOP,否则任务运行前需要清空中断标识;
                 * 但是万一”runState < STOP && Thread.interrupted()”
                 * 之间穿插了一个shutdownNow了该怎么办?因此,需要再次检查下
                 * runState的状态,如果是STOP则要重置下异常标识位
                 */
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();
                /*
                 * 一个标志位,保证task.run方法抛出RuntimeException后是否需要回调afterExecute方法
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try 
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                 catch (RuntimeException ex) 
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                
             finally 
                runLock.unlock();
            
        
void workerDone(Worker w) 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            // 把worker线程完成的任务数累计到线程池中,并从workers中删除掉
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
         finally 
            mainLock.unlock();
        
    
private void tryTerminate() 
        // 线程池状态变成TERMINATED的条件是:
        // 1、poolSize=0、SHUTDOWN且workQueue为空
        // 2、poolSize=0、STOP
        // 否则,至少要保证线程池中有一个线程处理队列里的消息
        if (poolSize == 0) 
            int state = runState;
            if (state < STOP && !workQueue.isEmpty()) 
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);
                if (t != null)
                    t.start();
            
            if (state == STOP || state == SHUTDOWN) 
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            
        
    
    Worker.run方法很清晰,循环从workQueue里获取task(getTask),然后执行task(runTask),如果获取不到task,线程就退出了,注意:如果你的task里抛出异常,那么这个worker线程也会退出,但是不要紧线程池会再new一个补充的。


以上是关于Java 1.6 ThreadPoolExecutor源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Java 1.6 HttpsURLConnection:java.net.SocketException:连接重置

Hibernate 3 与 Java 1.6 兼容吗?

java 1.6 可以使用pageholder吗

使用 Java 1.6 通过 JMS 连接到 SQS

如何在使用 1.6 运行基本代码时使用 Java 1.7 编写 Junit

已更改为 1.6