ThreadPoolExecutor解析-部分源码研究

Posted 智公博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor解析-部分源码研究相关的知识,希望对你有一定的参考价值。

注:本文的分析和源码基于jdk1.7;

一、ThreadPoolExecutor创建

ThreadPoolExecutor作为java.util.concurrent包中核心的类,先看下类型的结构:

UML类图

 

最顶级的接口都是Executor,而ThreadPoolExecutor继承于抽象类AbstractExecutorService,提供以下4个构造函数用于创建:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,ong keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

 

前面的3个方法都是使用通过this调用最后一个方法,没有指定的构造参数使用默认参数,参数解析:

1、

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

线程池核心线程数大小,初始化时核心线程数也是0,除非先调用prestartCoreThread或者prestartAllCoreThreads先创建核心线程;在没有设置allowCoreThreadTimeOut为true情况下,核心线程不会销毁;

2、

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务,任务会进入等待队列或者由拒绝策略处理;

该值实际的可设置最大值不是Integer.MAX_VALUE,而是常量CAPACITY(后面再解析常量)

3、

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

空闲工作线程的空闲时间;超过corePoolSize的线程或者allowCoreThreadTimeOut为true的主线程使用这个作为超时时间;

否则线程一直等待任务或关闭;

4、

    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

  这个队列用于保存任务以及为工作线程提供待执行的任务;不要求该队列的poll方法返回null表示该队列为空,队列是否为空(用于决定是否从shutdown状态变为tidying状态)仅仅依靠isEmpty方法判断;这是为了兼容延时队列poll方法可能返回为null,但在延时到期实际返回时非空;

5、

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread#start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory;

线程工厂,线程生成器;所有线程通过这个工厂创建(通过 addWorker方法)任何调用都要做好可能由系统或者用户设置的线程数限制策略导致的创建失败;虽然失败不能被当做错误处理,但是创建线程失败可能导致新任务被拒绝,已经存在任务继续阻塞在队列里等待执行;

 

6、

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

任务拒绝策略;负责处理当线程饱和后(线程数达到最大,等待队列满了)、线程池正在关闭时的新提交任务;

ThreadPoolExecutor内部有实现4个拒绝策略:

(1)、CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务;

(2)、AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务;

(3)、DiscardPolicy,直接抛弃任务,不做任何处理;

(4)、DiscardOldestPolicy,去除任务队列中的第一个任务(最旧的),重新提交;

推荐使用前面两种拒绝策略,特别是对于不知道如何使用或第一次使用线程池;当然也可以自己实现拒绝策略,

只要继承java.util.concurrent.RejectedExecutionHandler接口;

该值默认是AbortPolicy;

 

通过上面的4个构造函数创建完线程池后,就可以通过submit或execute方法提交任务;

工作完成后当然最后要关闭线程池,可以调用下面两个方法:

shutdown():对已经在执行的线程进行关闭,不再接收新的任务;

shutdownNow():尝试停止正在执行的线程,会清除任务队列中的任务;

以上两个方法都不会等到所有任务完成后关闭,需要通过awaitTermination(long, TimeUnit)方法实现;

二、以下通过部分源码研究分析,是按照我自己的思路往下写,不一定符合每个人的理解顺序;

ThreadPoolExecutor线程池的状态:

    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

其中COUNT_BITS是 int 位数

private static final int COUNT_BITS = Integer.SIZE - 3;  //Integer.SIZE=32

所以实际 COUNT_BITS = 29,

用上面的5个常量表示线程池的状态,实际上是使用32位中的高3位表示;后面还会讲到这些常量;

高3位:

RUNNING=111

SHUTDOWN=000

STOP=001

TIDYING=010

TERMINATED=110

//线程池最大线程数=536870911(2^29-1)

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

前面的构造函数虽然传入的maximumPoolSize是个int值,但是实际最大值是这个;

这样线程池的状态和线程数量就由一个变量存储:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //使用AtomicInteger 当然是为了保证多线程同步问题

ctl 可以理解为control(控制),初始值为线程数0,状态RUNNING:

private static int ctlOf(int rs, int wc) { return rs | wc; }

接下来通过状态来看线程池的运行:

完成线程的创建后,首先通过 submit 或者 execute 方法提交,实际submit 方法也是内部调用 execute方法;我们只看这个方法:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

这个方法主要有三个逻辑(if...if...else):

1、判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程,如果能完成新线程创建exexute方法结束,成功提交任务;

2、在第一步没有完成任务提交;状态为运行并且能成功加入任务到工作队列后,再进行一次check,如果状态在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程;

3、如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池已经达到饱和状态,所以reject;

从上面新增任务的execute方法也可以看出,拒绝策略不仅仅是在饱和状态下使用,在线程池进入到关闭阶段同样需要使用到;

上面的几行代码还不能完全清楚这个新增任务的过程,肯定还需要清楚addWorker方法才行:

private boolean addWorker(Runnable firstTask, boolean core) {//firstTask:新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用corePoolSize作为上限,否则使用maxmunPoolSize</span>
        retry: //较少使用到的标识符,用于重试
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))//线程状态非运行并且当非shutdown状态下任务为空且队列非空;
                return false;  //判断条件有点难理解,其实是非运行状态下(>=SHUTDOWN)或者SHUTDOWN状态下任务非空(新提交任务)、任务队列为空,就不可以再新增线程了(return false),即SHUTDOWN状态是可以新增线程去执行队列中的任务;
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) //实际最大线程数是CAPACITY;
                    return false;
                if (compareAndIncrementWorkerCount(c)) //AtomicInteger的CAS操作;
                    break retry;                       //新增线程数成功,结束retry(retry下的for循环)
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) //状态发生改变,重试retry;
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); // Worker为内部类,封装了线程和任务,通过ThreadFactory创建线程,可能失败抛异常或者返回null
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // SHUTDOWN以后的状态和SHUTDOWN状态下firstTask为null,不可新增线程
                            throw new IllegalThreadStateException();
                        workers.add(w); //保存到一个HashSet中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s; //记录最大线程数
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//失败回退,从wokers移除w,线程数减一,尝试结束线程池(调用tryTerminate方法,后续解析)
        }
        return workerStarted;
    }

通过以上两个方法,对于新增任务应该是比较清楚了,新增任务可以说是对线程池最常的操作了;

接下来解析下可能使用到的关闭方法:shutdown,shutdownNow

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); //这个方法校验线程访问许可,不是很理解,后面有时间再单独解析;
            advanceRunState(SHUTDOWN); //转换线程池状态为SHUTDOWN
            interruptIdleWorkers(); //中断所有空闲的线程
            onShutdown(); // 空实现方法,是做shutdown清理操作的
        } finally {
            mainLock.unlock();
        }
        tryTerminate(); //尝试结束线程池(设置状态为TERMINATED)
 }

 

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();//同上
            advanceRunState(STOP);//转换线程池状态到STOP
            interruptWorkers();//中断所有线程
            tasks = drainQueue();//获取到任务队列所有任务,并清空队列
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//同上
        return tasks;
    }

由上可知,两个关闭方法的区别:

1、shutdown设置状态为SHUTDOWN,而shutdownNow设置状态为STOP;

2、shutdown值中断空闲的线程,已提交的任务可以继续被执行,而shutdownNow中断所有线程;

3、shutdown无返回值,shutdownNow返回任务队列中还未执行的任务;

为了更新深入理解,我们再来看下 tryTerminate方法

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))  //保证只有SHUTDOWN状态下任务队列为空和STOP状态下才以尝试终止
                return;
            if (workerCountOf(c) != 0) { //线程数还不是0情况下不可结束线程池
                interruptIdleWorkers(ONLY_ONE); //只为了中断一个线程?还不是非常理解设计者的意思
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作设置TIDYING状态,注意这里处于循环中,失败会重设的
                    try {
                        terminated(); //空实现方法
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));//最终状态TERNINATED
                        termination.signalAll();//可重入锁的condition,通知所有wait,后面会有看到
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }


虽然有shutdown和shutdownNow方法,但是还是不能满足一个需求:就是需要知道等待所有任务已完成线程池结束

这里ThreadPoolExecutor提供了awaitTermination方法满足这个需求:

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

这个方法两个入参,设置等待超时时间;

如果状态已经是TERMINATED返回true,表示已关闭;

否则一直等到termination的signalAll至超时或者当前线程中断;超时后都线程池都没有关闭,返回false;

 

在上面提到有几个空实现的方法,这些方法不是没有用处的,当有需要继承ThreadPoolExecutor类时,

按需要实现这个方法是最好的了;后面有时间继续分享一些关于继承这个类实现不同需求的分析;

 

希望通过对部分源码的解析,能帮助到自己和其他人更好理解以及试用ThreadPoolExecutor;

对源码只分析上面主要的一些方法,这里不再一一列出,有兴趣的可以阅读完整的代码;

如果有其他问题或者有写的不对的地方,还请评论;
 

以上是关于ThreadPoolExecutor解析-部分源码研究的主要内容,如果未能解决你的问题,请参考以下文章

Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码

ThreadPoolExecutor源码解析

ScheduledThreadPoolExecutor源码主要部分解析

Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字

Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字

线程池技术之:ThreadPoolExecutor 源码解析