ThreadPoolExecutor线程池设计思路

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor线程池设计思路相关的知识,希望对你有一定的参考价值。

ThreadPoolExecutor线程池设计思路


引言

本篇文章我将会尝试用自己的语言,用我个人的想法来阐述一下ThreadPoolExecutor的设计思路。

首先,我们来看一下ThreadPoolExecutor的继承体系:

public interface Executor 
    void execute(Runnable command);

Executor意为执行器,作为顶层抽象接口,提供了一个execute方法来执行传入的任务,而该任务如何被执行,则是由不同实现子类来决定的。

  • 按任务执行方式的不同,我们就能构思出下面一副架构图


其次,我们还需要考虑到对任务管理接口的统一,例如: 停止任务,提交任务等… 。 因为,每个不同的子类实现,都有对任务管理的需求,因此我们需要统一任务管理相关接口,防止混乱。

任务管理相关接口由ExecutorService为我们规定好了:

public interface ExecutorService extends Executor 

    //启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。
    //此方法不等待先前提交的任务完成执行。使用awaitTermination来做到这一点。
    void shutdown();

    //尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
    //此方法不等待主动执行的任务终止。使用awaitTermination来做到这一点。
    //除了尽最大努力停止处理正在执行的任务之外,没有任何保证.
    List<Runnable> shutdownNow();

    //如果此执行程序已关闭,则返回true。
    boolean isShutdown();

    //调用shutdown或shutdownNow后,如果所有任务都已完成,则返回true。
    boolean isTerminated();

    //在shutdown请求后阻塞,直到所有任务都完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    //提交一个有返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 Future 的get方法将在成功完成后返回任务的结果。
    <T> Future<T> submit(Callable<T> task);

    //提交Runnable任务以执行并返回代表该任务的Future。Future的get方法将在成功完成后返回给定的result。
    <T> Future<T> submit(Runnable task, T result);

    //提交Runnable任务以执行并返回代表该任务的Future。Future 的get方法将在成功完成后返回null。
    Future<?> submit(Runnable task);

    //执行给定的任务,返回一个 Futures 列表,在所有完成时保存它们的状态和结果。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    //执行给定的任务,当全部完成或超时到期时,返回保存其状态和结果的 Futures 列表。返回时,未完成的任务将被取消。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    //执行给定的任务,返回已成功完成的任务的结果(即不抛出异常)。正常或异常返回时,取消未完成的任务。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    //执行给定任务,超时前返回已成功完成的任务的结果(即不抛出异常)。正常或异常返回时,取消未完成的任务。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

如果我们想要获取任务的返回值,可以通过Executors体系下提供的Future来获取。

对于任务执行器而言,还有一个非常重要的就是任务队列,毕竟一下子来了很多任务,又无法及时处理,那就需要先找个地方存一下嘛。

相关具体实现,如下图所示:

这里先不对Future具体实现进行讲解,大家先了解即可


ThreadPoolExecutor

关于线程池一些常见概念,我这里就不多解释了,不清楚的可以看下面这篇文章:

为什么需要线程池?

ThreadPoolExecutor的核心工作流程其实就如下面这幅图所示一般:

看起来似乎很简单,但是如果要实现,又该从哪里写起来呢?


线程池相关属性

线程池的设计离不开一堆参数来记录线程池当前的状态,那么具体应该记录哪些状态呢?

public class ThreadPoolExecutor extends AbstractExecutorService 

    // 控制变量-存放线程池状态和线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // 任务队列,必须是阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    
    // 全局锁
    private final ReentrantLock mainLock = new ReentrantLock();

    // awaitTermination方法使用的等待条件变量
    private final Condition termination = mainLock.newCondition();

    // 记录峰值线程数
    private int largestPoolSize;
    
    // 记录已经成功执行完毕的任务数
    private long completedTaskCount;
    
    // 线程工厂,用于创建新的线程实例
    private volatile ThreadFactory threadFactory;

    // 拒绝执行处理器,对应不同的拒绝策略
    private volatile RejectedExecutionHandler handler;
    
    // 空闲线程等待任务的时间周期,单位是纳秒
    private volatile long keepAliveTime;
    
    // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
    private volatile boolean allowCoreThreadTimeOut;
    
    // 核心线程数
    private volatile int corePoolSize;

    // 线程池容量
    private volatile int maximumPoolSize;

    // 省略其他代码
    

这其中部分状态是随着线程池运行起来后,动态改变的,还有一部分是需要用户提前设置好的,相当于一个指标,例如: 核心线程数,最大线程数等等…

用户可以在ThreadPoolExecutor的构造参数中进行设置:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;

可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:

  • corePoolSize:int类型,核心线程数量。
  • maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。
  • keepAliveTime:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。
  • unit:TimeUnit类型,keepAliveTime参数的时间单位,实际上keepAliveTime最终会转化为纳秒。
  • workQueue:BlockingQueue< Runnable >类型,等待队列或者叫任务队列。
  • threadFactory:ThreadFactory类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。
  • handler:RejectedExecutionHandler类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:
    • AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。
    • DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。
    • DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。
    • CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。

线程池状态记录

如同Java线程有五种状态一般,线程池也应该具有不同的状态,并且每种状态之前的切换和行为表现都不同,ThreadPoolExecutor通过ctl属性的前三位来存放线程池当前状态,后29位则存放当前线程池中工作线程数量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS =29
private static final int COUNT_BITS = Integer.SIZE - 3;
//000 111....111 三个0,29个1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

//线程池的五种状态,每个值只有前三位不同
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 通过ctl值获取运行状态
private static int runStateOf(int c)      return c & ~COUNT_MASK; 
// 通过ctl值获取工作线程数
private static int workerCountOf(int c)   return c & COUNT_MASK; 

// 通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc)  return rs | wc; 

private static boolean runStateLessThan(int c, int s) 
    return c < s;


private static boolean runStateAtLeast(int c, int s) 
    return c >= s;


private static boolean isRunning(int c) 
    return c < SHUTDOWN;


// CAS操作线程数增加1
private boolean compareAndIncrementWorkerCount(int expect) 
    return ctl.compareAndSet(expect, expect + 1);


// CAS操作线程数减少1
private boolean compareAndDecrementWorkerCount(int expect) 
    return ctl.compareAndSet(expect, expect - 1);


// 线程数直接减少1
private void decrementWorkerCount() 
    ctl.addAndGet(-1);

接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:

我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。
在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。
其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。
工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。

接着看工作线程上限数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:


然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING状态:

// -1的补码为:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10进制值为:-536870912 
// 高3位111的值就是表示线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;

控制变量ctl的组成就是通过线程池运行状态rs和工作线程数wc通过或运算得到的:

// rs=RUNNING值为:111-00000000000000000000000000000
// wc的值为0:000-00000000000000000000000000000
// rs | wc的结果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc)  
    return rs | wc; 

那么我们怎么从ctl中取出高3位?上面源码中提供的runStateOf()方法就是提取运行状态:

// 先把COUNT_MASK取反(~COUNT_MASK),得到:111-00000000000000000000000000000
// ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可得到高3位xxx
private static int runStateOf(int c) 
    return c & ~COUNT_MASK; 

同理,取出低29位只需要把ctl和COUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。

小结一下线程池的运行状态常量:


这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl进行比较,或者使用ctl和线程池状态常量进行比较)来比较和判断线程池的状态:

RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

下面这三个方法就是使用这种技巧:

// ctl和状态常量比较,判断是否小于
private static boolean runStateLessThan(int c, int s) 
    return c < s;


// ctl和状态常量比较,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) 
    return c >= s;


// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态
private static boolean isRunning(int c) 
    return c < SHUTDOWN;

最后是线程池状态的跃迁图:

PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status


execute执行任务

ThreadPoolExecutor需要将任务交给线程池来执行,下面我们就来看看,究竟是如何实现的:

public void execute(Runnable command) 
    if (command == null)
        throw new NullPointerException();
    //获取线程池状态: 前3位表示线程池状态,后29位表示存活线程数量
    int c = ctl.get();
    //1.在核心线程数未满之前,先创建核心线程来执行任务
    //通过位运算拿到工作线程数量,判断当前存活的工作线程数量是否小于核心线程数量
    if (workerCountOf(c) < corePoolSize) 
        //如果核心线程创建成功则直接返回
        if (addWorker(command, true))
            return;
        //这里说明创建核心线程失败,需要更新ctl的临时变量c
        c = ctl.get();
    
    //2. 将任务先放到任务队列中去
    // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
    // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false)
    if (isRunning(c) && workQueue.offer(command)) 
        int recheck = ctl.get();
          // 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
        // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例)
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 走到下面的else if分支,说明有以下的前提:
        // 0、待执行的任务已经成功加入任务队列
        // 1、线程池可能是RUNNING状态
        // 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
        //  或者我们设置核心线程数量为0
        
        // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回
        // 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行 
        // 如果工作线程数量此时不为零,那么就让存活的工作线程去任务队列取活干
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    
    //3.任务队列已满,下面尝试创建非核心线程来执行任务
    else if (!addWorker(command, false))
        reject(command);

这里简单分析一下整个流程:

  1. 如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。
  2. 如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。
  3. 如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
  4. 如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务。

如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程

任务提交流程从调用者的角度来看如下:


工作线程抽象为Worker

因为我们需要对线程池中每个线程的状态进行记录,因此就无法用一个单纯的List< Thread > 集合来保存所有工作线程,还需要对工作线程做一层包装,我们来看一下ThreadPoolExecutor是如何做的:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

    // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null
    final Thread thread;
    // 保存传入的Runnable任务实例
    Runnable firstTask;
    // 记录每个线程完成的任务总数
    volatile long completedTasks;
    
    // 唯一的构造函数,传入任务实例firstTask,注意可以为null
    Worker(Runnable firstTask) 
        // 禁止线程中断,直到runWorker()方法执行
        setState(-1);
        this.firstTask = firstTask;
        // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例
        this.thread = getThreadFactory().newThread(this);
    

    // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法
    public void run() 
        runWorker(this);
    

    //  是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
    protected boolean isHeldExclusively() 
        return getState() != 0;
    

    // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程
    protected boolean tryAcquire(int unused) 
        if (compareAndSetState(0, 1)) 
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        
        return false;
    
    
    // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0
    protected boolean tryRelease(int unused) 
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    
    
    // 加锁
    public void lock()         acquire(1); 

    // 尝试加锁
    public boolean tryLock()   return tryAcquire(1); 

    // 解锁
    public void unlock()       release(1); 

    // 是否锁定

以上是关于ThreadPoolExecutor线程池设计思路的主要内容,如果未能解决你的问题,请参考以下文章

线程池ThreadPoolExecutor

线程池源码分析-ThreadPoolExecutor

JAVA基础学习之-ThreadPoolExecutor的实现原理

美团动态线程池实践思路,开源了!

美团动态线程池实践思路,开源了!

彻底停止运行线程池ThreadPoolExecutor