ThreadPoolExecutor分析

Posted wolf-w

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor分析相关的知识,希望对你有一定的参考价值。

1 ThreadPoolExecutor

1.1 常见参数

1.1.1 线程池参数

BlockingQueue<Runnable> workQueue;    // 任务队列。使用workQueue.isEmpty()判断队列是否为空,而非workQueue.poll()==null判断,这样的判空方式容纳特殊队列,如DelayQueue
ReentrantLock mainLock;     
HashSet<Worker> workers;    // 线程池中的所有工作线程,仅能在mainLock下访问
Condition termination;      // 用于支持awaitTermination
// 所有的控制参数都被定义为volatile
ThreadFactory threadFactory;
RejectedExecutionHandler handler;
long keepAliveTime;
boolean allowCoreThreadTimeOut;
int corePoolSize;
int maximumPoolSize;            
int largestPoolSize;        // 线程池中工作线程的历史最大数量:largestPoolSize = workers.size() > largestPoolSize ? workers.size() : largestPoolSize;
long completedTaskCount;    // 完成任务的计数器,仅在工作线程终止时更新,仅在mainLock下访问

1.1.2 核心参数ctl

/* 核心字段,打包了2种含义:worker线程数、线程池状态。
workerCount(低29位):已经被允许start并且不被允许stop的worker的数量。该值可能与活动线程的实际数量会出现短暂性不同
runState(高3位):状态的数值顺序是重要的,以允许有序的比较,状态流转如下
    > RUNNING -> SHUTDOWN,调用shutdown()方法,可能隐含在finalize()方法中
    > RUNNING or SHUTDOWN -> STOP,调用shutdownNow()
    > STOP -> TIDYING,当线程池是empty时
    > TIDYING -> TERMINATED,当terminate()钩子方法执行完成
    当状态为TERMINAED时,线程在awaitTermination()方法上的等待将会返回。
运行状态描述 已知:RUNNING=111, SHUTDOWN=0, STOP=001, TIDYING=010, TERMINATED=011
    RUNNING     // 接收的新任务,并且处理排队中的任务
    SHUTDOWN    // 不接受新任务,但处理排队中的任务
    STOP        // 不接受新任务,不处理排队中的任务,并且中断正在处理的任务
    TIDYING     // 所有任务已经终止,workerCount=0,将运行terminate()钩子方法
    TERMINATED  // terminate()钩子方法执行完毕
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

1.1.3 关于mainLock

ReentrantLock mainLock;
HashSet<Worker> workers;    // 线程池中的所有工作线程,仅能在mainLock下访问

mainLock字段主要是为了保证工作线程池字段workers(非安全集合HashMap)在多线程并发情况下的访问。至于workers为何使用HashMap而非使用安全的ConcurrentHashMap,原因如下所示:

使用mainLock加锁操作会让操作按照顺序一个一个执行。这样保证了interruptIdleWorkers()方法在执行期间避免中断风暴,尤其是在shutdown期间。

注:interruptIdleWorkers()方法只会被shutdown()方法调用

1.2 Worker线程

1.2.1 简介

从类图结构来看:Worker类既是锁,又是线程。

核心代码如下所示,共分为3部分:

  • 多线程核心代码:run()方法
  • AQS核心代码:tryAcquire、tryRelease。由实现可知,该锁是不可重入锁
  • 为worker线程中断提供方法:interruptIfStarted()
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1);   // 抑制中断操作,直到runWorker()方法开始运行,runWorker()方法运行时会先执行unlock()方法,将state设置为0
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    // 多线程核心部分:实现run()方法
    public void run() {
        runWorker(this);
    }
    
    // AQS核心实现部分
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    // 中断worker线程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

1.2.2 抑制中断

worker线程自己提供了中断worker的方法interruptIfStarted()。即创建Worker对象之后,只有在worker线程运行后,才可以执行worker线程的中断操作。

抑制中断的手段:

  1. 将state=-1
  2. 提供方法interruptIfStarted()
  3. tryAcquire()方法中,CAS只有在state=0的时候才能操作成功,即获取到锁

在中断操作的方法中:

  • interruptWorkers():该方法会调用interruptIfStarted()方法中断worker线程,而该方法只有在state>=0的情况下才会执行中断操作
  • interruptIdleWorkers():该方法只有在worker.tryLock()获取到锁时,才能执行中断操作

而上述情况下,state=-1,均不能执行成功。只有在runWorker()方法执行后,才会将state=0操作执行

2.3 execute

2.3.1 execute核心

该方法作为整体方法入口,会根据线程池的线程数量、线程池状态,来决定针对添加的task执行以下哪3种操作:

  • 添加worker线程(前提条件:线程池为运行态)
    • worker线程数量<corePoolSize
    • 任务队列添加新task成功,且线程池仍在运行状态,但此时线程池worker线程数量为0
    • 任务队列添加新task成功,且线程池状态已经不在运行态,但从任务队列中移除新任务失败,但此时线程池worker线程数量为0
  • 放入任务队列(前提条件:线程池为运行态)
    • worker线程数量<corePoolSize,但addWorker()添加线程池失败,且线程池为运行态
    • worker线程数量>=corePoolSize,且线程池为运行态
  • 拒绝任务
    • 线程池为非运行态
    • 线程池数量满了,任务队列满了
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

2.3.2 addWorker

该方法一共干了3件事:

  1. 将线程池中worker线程的数量自增
  2. 创建一个Worker对象,并添加到workers属性中
  3. 添加成功后,启动worker线程

2.4 runWork

源码省略

2.4.1 getTask

该方法有个非常重要的关键点,即:worker线程是否会正常执行结束。即:getTask()方法一旦返回null,表明该worker线程无task可以处理,此时正常情况下该worker线程将会执行结束,线程退出。而getTask()方法中,会会根据timed来决定在BlockingQueue获取task时,是否阻塞等待。

总结:该方法一旦返回NULL,即表明该worker线程将执行结束;否则,worker线程将在BlockingQueue队列中阻塞等待

private Runnable getTask() {
    boolean timedOut = false; // 在workQueue.poll()是否超时
    for (;;) {
        /* 如下操作判断线程池的状态,是否结束该线程:
            1. 线程池状态SHUTDOWN
            2. 线程池状态STOP,且任务队列为空(即没有任何可执行任务)*/
        int c = ctl.get();
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        
        int wc = workerCountOf(c);
        /* timed:用于判断worker线程调用BlockingQueue获取task时,阻塞是否为有限期的阻塞。
            timed=true,表示是有超时时间的阻塞
            timed=false,表示无限期阻塞
           allowCoreThreadTimeOut=true,则表示允许核心线程数超时 */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
        // 线程池数量大于maximumPoolSize,该线程获取任务直接返回null。
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

2.4.2 processWorkerExit

顾名思义,该方法时worker线程退出前的处理工作:

  1. 如果是异常导致退出worker线程,则减少workerCount数量
  2. 将当前线程完成的任务数量,累加到completedTaskCount中

  3. 尝试进行线程池终止

  4. 如果线程池仍在RUNNING状态,且worker线程没有异常退出,由于getTask()==null,即任务等待队列已经为空,此时判断coreThread是否允许超时,来限制空闲的workers的线程数量

注:是否将线程池的worker线程数量维护在一个稳定范围,仍然需要考虑。该方法就是做如此处理:worker线程在结束前,会判断线程池是否需要新增一个新的worker线程,新增worker线程的情况如下:

  • worker线程为异常终止
  • 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 试着看看线程池是否可以终止
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        /* 新增worker线程的情况如下:
        1. worker线程为异常终止
        2. 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)*/
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())  // 如果min == 0,且任务队列中又增加新任务,则将min=1,workers中保留min个worker线程。
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 相当于创建一个新的worker线程,但没有马上为该worker线程分配task。该worker将会从队列中getTask()获取任务。
        addWorker(null, false);
    }
}

2.5 shutdown

2.5.1 代码示例

核心操作如下:

  1. 设置线程池运行状态为SHUTDOWN
  2. 中断所有空闲worker线程(即处于park状态的线程):interruptIdleWorkers()
  3. 调用钩子方法:onShutdown()
  4. 尝试将线程池设置为终止状态:tryTerminate()
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

2.5.2 中断风暴

中断风暴的产生,主要来自于如下方法:

  • 遍历空闲的worker线程
  • 判断是否已经中断,未中断,则进行中断操作

假如多线程并发调用shutdown()方法,此时若没有mainLock锁,让线程有序的进行调用,则可能引发大规模的空闲worker线程中断

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

2.5.3 对比shutdownNow

两个方法如下所示,其中主要区别为:

  1. shutdown()的状态是SHUTDOWN;而shutdownNow()的状态则是STOP
  2. shutdown()中断空闲worker线程;而shutdownNow()则是中断所有worker线程
  3. shutdown()有钩子方法

技术图片

2.5.4 中断worker线程对比

中断worker的线程总共有两类:

  1. 中断空闲worker:若未中断,且不可重入锁加锁成功,再中断。非重入锁加锁的时机有两个:

    • 此处,用于中断空闲线程
    • runWorker()方法中,getTask()获取任务时,会加锁。

    即,运行可执行任务和中断线程的操作,是不可同时发生!!

  2. 中断所有worker:遍历直接中断

技术图片

2.6 总结钩子方法

1、runWorker()中有前置、后置方法

2、shutdown()中有onShutdown()方法

以上是关于ThreadPoolExecutor分析的主要内容,如果未能解决你的问题,请参考以下文章

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析

Java中的线程池——ThreadPoolExecutor源代码分析

高并发多线程基础之ThreadPoolExecutor源代码分析

线程池ThreadPoolExecutor分析

Android Java 线程池 ThreadPoolExecutor源代码篇

线程池源码分析-ThreadPoolExecutor