线程池原理剖析

Posted SpringForAll社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池原理剖析相关的知识,希望对你有一定的参考价值。

原文来自我的博客月泉的博客 转载请注明,博客的Github,如果本文对您有帮助的话,可以帮点个star。

食用该文章最好具有队列和AQS的知识,关于AQS可以从我以前书写的文章中找出

线程池是什么

线程大家都清楚是什么?那么线程池是什么?在使用线程的时候有没有考虑过,在平时使用的时候系统中只要你想随处都可以创建线程并且很难管控,基本上一个线程使用完就销毁掉了,要在使用便新建一个线程

直接使用线程的缺陷(针对线程池)

  • 线程数量无法限制,想创建多少个就多少个

  • 线程无法复用,创建启动和销毁线程是会带来一定的开销

线程池的出现主要就是解决2个问题,一个是限制线程的数量和线程复用,在这个扩展上面可以再自行扩展出监控等。

线程池的使用

Java的 Executors工具类就提供了几种现成的创建线程池实例的方法()

  • newFixedThreadPool

  • newSingleThreadPool

  • newCachedThreadPool

  • newScheduledThreadPool

他们最终的放回值都是返回一个 ExecutorService,但 newScheduledThreadPool返回的是 ScheduledExecutorService但该接口也是继承至 ExecutorService接口。

newFixedThreadPool

该方法是创建一个固定线程数量的线程池,其核心线程数和最大线程数都是设的值。

 
   
   
 
  1. class PrintTask implements Runnable{

  2.    private int sequence;

  3.    private long sleepMillis;

  4.    public PrintTask(int sequence, long sleepMillis) {

  5.        this.sequence = sequence;

  6.        this.sleepMillis = sleepMillis;

  7.    }

  8.    @Override

  9.    public void run() {

  10.        try {

  11.            Thread.sleep(sleepMillis);

  12.        } catch (InterruptedException e) {

  13.            e.printStackTrace();

  14.        }

  15.        System.out.println(Thread.currentThread().getName() + " print: " + sequence);

  16.    }

  17. }

  18. public static void main(String[] args) {

  19.    ExecutorService executor = Executors.newFixedThreadPool(10);

  20.    for (int i = 0; i < 12; i++) {

  21.        executor.execute(new PrintTask(i+1, 3000L));

  22.    }

  23.    executor.shutdown();

  24.    System.out.println("Is shutdown : " + executor.isShutdown());

  25.    System.out.println("Is terminated : " + executor.isTerminated());

  26.    try {

  27.        Thread.sleep(5000L);

  28.    } catch (InterruptedException e) {

  29.        e.printStackTrace();

  30.    }

  31.    System.out.println(executor.isTerminated());

  32. }

从示例代码中可以看出,首先我创建了一个固定大小的线程池,其固定数量为10,然后 execute了12个任务,在执行的过程中当前10个任务没有执行完时,第11个和第12个任务会被阻塞到当有空余的线程数量时开始执行,从输出的结果上来看,第11个和第12个任务并没有在新创建线程来执行任务而是复用线程来执行,我全部 execute掉了以后我尝试调用了 shutdown方法来关闭线程池,当然调用了以后并没有真正的关闭线程池,它会等待线程池中所有的任务(包括阻塞队列中没有执行完的任务)都执行完在关闭,通过 isShutdown可以获取到是否调用过 shutdown方法,通过 isTerminated获取线程池是否已经终止,也就是停止掉了,此时返回的是 false因为线程池中的任务还没有执行完,当过了5秒以后再次执行 isTerminated方法返回 true因为线程池已经完全 shutdown

newSingleThreadPool

newSingleThreadPool会创建只有一个线程的线程池,线程由于只有一个,所以 execute的任务会一个一个的执行

 
   
   
 
  1. public static void main(String[] args) {

  2.    ExecutorService executor = Executors.newSingleThreadExecutor();

  3.    for (int i = 0; i < 12; i++) {

  4.        executor.execute(new PrintTask(i+1, 3000L));

  5.    }

  6.    executor.shutdown();

  7. }

newCachedThreadPool

这个线程池创建时在你的感知上是没有数量限制,因为它的实现,给予的最大空闲数量为 Integer.MAX_VALUE,在执行任务时,如果没有多余的空闲线程执行该任务,就会创建一个新的线程来执行这个任务

 
   
   
 
  1. public static void main(String[] args) {

  2.    ExecutorService executor = Executors.newCachedThreadPool();

  3.    for (int i = 0; i < 12; i++) {

  4.        executor.execute(new PrintTask(i+1, 3000L));

  5.    }

  6.    executor.shutdown();

  7. }

newScheduledThreadPool

 
   
   
 
  1. public static void main(String[] args) {

  2.    ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

  3.    executor.schedule(new PrintTask(1, 3000L), 1L, TimeUnit.SECONDS);

  4.    executor.scheduleAtFixedRate(new PrintTask(2, 2000L), 1, 1, TimeUnit.SECONDS);

  5.    executor.scheduleWithFixedDelay(new PrintTask(3,3000L), 1, 1, TimeUnit.SECONDS);

  6. }

首先创建了一个核心线程数量10的线程池,其最大空闲数量背后的实现仍旧是 Integer.MAXVALUE,然后调用了 schedule该方法是首先是只调度一次任务,第一个参数为要调度的任务,第二参数为时间,第三个参数为时间单位,在接着使用了 scheduleAtFixedRate这是相对频率重复调度任务的方法,第一个参数为要调度的任务,第二个参数为首次开始的延时时间,第三个参数为相对上一个任务开始的延迟执行时间,第四参数为时间单位, scheduleWithFixedDelay第一个参数为要调度的任务,第二个参数为首次延迟执行的时间,第三个参数为上一个任务执行结束后执行的间隔时间,第四个参数为时间单位

以上4种线程池的不同点及场景

以下都是我个人观点,若有异议可以邮箱交流 yuequan1997@gmail.com

newFixedThreadPool

特征

固定大小的线程数量,超出线程数量的任务会阻塞等待到有空闲的线程时执行且长度是原则上无限的(毕竟还受JVM参数或硬件内存大小等影响)

适用场景

可预计或不可预计的并发任务数量且时间是模糊的不确定时间的长短,可预计是指例如我可能每次并发添加10个任务,那么便提前预备好线程数量5个或线程数量10或者20个,以便"高效"的并发执行,不可预计任务数量是指根据当前资源的分配情况下来合理分配线程池中的线程数量,限定它最大并发处理任务的数量。

newSingleThreadPool

特征

固定大小为1的线程数量,重复的利用这1个线程去执行任务,且同时只能执行1个任务,在任务未执行完之前后续添加的任务都会被阻塞且长度是原则上无限的(毕竟还受JVM参数或硬件内存大小等影响)

使用场景

资源有限一次只执行一个任务(本来还想说执行有先后依赖顺序的,其实这个真的不推荐这样设计,所以忽略),其它任务加入队列挨个执行,重复利用一个线程节省线程创建和销毁的开销

newCachedThreadPool

特征

其最大线程数量为 Integer.MAX_VALUE,在任务来时无空闲线程的时候则新开一个线程去执行它,若有空闲则使用空闲线程,每一个新开的线程,都有一个空闲存活60秒的时间,其长度原则上没有限制(毕竟还受JVM参数或硬件内存大小等影响)

使用场景

大部分任务执行时间比较短,而且频繁,使用该线程池即可复用线程又可在突增并发数时创建新的线程从而达到最大并发效率(并发数猛增的时候可能会出现意外,所以如果有这种场景扩展下线程池做个最大限制最为合理)

newScheduledThreadPool

特征

核心大小的线程数为传入的数量,其最大线程数量为 Integer.MAX_VALUE,线程有10秒的空闲存活时间,该线程池最主要的特征其实就是定时调度一次或重复调度

使用场景

任务需要在相对时间频率下执行

线程池的实现

Executor

Executor 是最顶层的接口,它只定义了一个接口方法,用来执行任务

 
   
   
 
  1. public interface Executor {

  2.    void execute(Runnable command);

  3. }

ExecutorService

ExecutorService接口继承至 Executor接口,在它之上扩展了线程池中的生命周期的管理和异步执行

 
   
   
 
  1. public interface ExecutorService extends Executor {

  2.    void shutdown();

  3.    List<Runnable> shutdownNow();

  4.    boolean isShutdown();

  5.    boolean isTerminated();

  6.    boolean awaitTermination(long timeout, TimeUnit unit)

  7.        throws InterruptedException;

  8.    <T> Future<T> submit(Callable<T> task);

  9.    <T> Future<T> submit(Runnable task, T result);

  10.    Future<?> submit(Runnable task);

  11.    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

  12.        throws InterruptedException;

  13.    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,

  14.                                  long timeout, TimeUnit unit)

  15.        throws InterruptedException;

  16.    <T> T invokeAny(Collection<? extends Callable<T>> tasks)

  17.        throws InterruptedException, ExecutionException;

  18.    <T> T invokeAny(Collection<? extends Callable<T>> tasks,

  19.                    long timeout, TimeUnit unit)

  20.        throws InterruptedException, ExecutionException, TimeoutException;

  21. }

接口方法大意如下:

  • shutdown

    尝试关闭线程池,首先会拒绝接收新的任务,其次会等待正在执行中的所有任务执行完(包括阻塞等待中的)


  • shutdownNow

    尝试关闭线程池,首先也会拒绝接收新的任务,对正在执行中的所有任务发出中断请求,同时抛弃队列中还在等待的任务。


  • isShutdown

    是否尝试关闭线程池,尝试过则返回 true否则返回 false


  • isTerminated

    线程池是否已经终止,用来判断 shutdownshutdownNow是否已经完全关闭了线程池


  • awaitTermination

    根据传入的时间延迟获取线程池关闭的状态,这里需要注意的是是阻塞等待


  • submit

    提交任务至线程池中


  • invokeAll

    批量给定任务,返回执行完的所有任务


  • invokeAny

    批量给定任务,返回一个已执行完的任务


AbstractExecutorService

AbstractExecutorService实现至 ExecutorService其主要实现了 invokeAllinvokeAnysubmit及对于异步任务的 cancel操作,这个类不是最主要关心的。

ThreadPoolExecutor

该类是 AbstractExecutorService的子类,同时也实现了上述所有未实现的接口,简而言之它就是线程池概念抽象的实现

 
   
   
 
  1. public class ThreadPoolExecutor extends AbstractExecutorService {

  2.    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  3.    private static final int COUNT_BITS = Integer.SIZE - 3;

  4.    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

  5.    private static final int RUNNING    = -1 << COUNT_BITS;

  6.    private static final int SHUTDOWN   =  0 << COUNT_BITS;

  7.    private static final int STOP       =  1 << COUNT_BITS;

  8.    private static final int TIDYING    =  2 << COUNT_BITS;

  9.    private static final int TERMINATED =  3 << COUNT_BITS;

  10.    private final BlockingQueue<Runnable> workQueue;

  11.    private final ReentrantLock mainLock = new ReentrantLock();

  12.    private final HashSet<Worker> workers = new HashSet<>();

  13.    private final Condition termination = mainLock.newCondition();

  14.    private int largestPoolSize;

  15.    private long completedTaskCount;

  16.    private volatile ThreadFactory threadFactory;

  17.    private volatile RejectedExecutionHandler handler;

  18.    private volatile long keepAliveTime;

  19.    private volatile boolean allowCoreThreadTimeOut;

  20.    private volatile int corePoolSize;

  21.    private volatile int maximumPoolSize;

  22.    ......................

  23. }

以上代码片段是该类定义的所有的实例变量及常量,以下是对这些实例变量及常量的一个简要解释

  • ctl

    该变量是一个复合含义的变量,其本身可以看作是一个 Integer变量,该变量的高3位代表线程池的状态,那么后29位(从低位往高位数)代表该线程池数量


  • COUNT_BITS

    数量的位数


  • COUNT_MASK

    数量位数的掩码


  • RUNNING

    表示运行中的状态标识


  • SHUTDOWN

    表示关闭中的状态标识


  • STOP

    表示已停止的状态标识


  • TIDYING

    表示当前所有任务已经终止,任务数量为0时的状态标识


  • TERMINATED

    表示线程池已经完全终止(关闭),关于线程池的关闭状态


  • workQueue

    用来保存等待任务执行的阻塞队列


  • mainLock

    可重入锁,方法里面会大量使用,很多变量的操作都需要使用该锁


  • workers

    该集合中包含了所有在工作的线程


  • termination

    锁条件队列,主要用于 awaitTermination


  • largestPoolSize

    记录线程池最大工作线程的数量(可能是个历史值)


  • completedTaskCount

    完成任务的计时器,仅在中止工作任务时更新


  • threadFactory

    用于创建线程的工厂


  • handler

    饱和策略的回调,当队列已满且线程个数达到 maximumPoolSize时采取的策略

    有以下几种策略


  • AbortPolicy


  • CallerRunsPolicy


  • DiscardOldestPolicy


  • DiscardPolicy

    分别为:抛出异常、使用调用者当前的线程来执行任务、调用队列的 poll丢弃一个任务,执行当前任务、默默丢弃该任务。


  • keepAliveTime

    空闲存活时间,如果线程池中的线程数量比核心线程数量还要多时,并且多出的这些线程都是闲置状态,该变量则是这些闲置状态的线程的存活时间啊


  • allowCoreThreadTimeOut

    默认为 false,即时是空闲核心线程也会处于活动状态,如果设为 true那么核心线程也会遵循 keepAliveTime的时间来做闲置处理


  • corePoolSize

    线程池核心线程数量


  • maximumPoolSize

    线程池最大线程数量


在大致清楚基础的变量后,我们从入口 execute开始

 
   
   
 
  1. public void execute(Runnable command) {

  2.    if (command == null) // 1

  3.        throw new NullPointerException();

  4.    int c = ctl.get(); // 2

  5.    if (workerCountOf(c) < corePoolSize) { // 3

  6.        if (addWorker(command, true)) // 3.1

  7.            return;

  8.        c = ctl.get(); // 3.2

  9.    }

  10.    if (isRunning(c) && workQueue.offer(command)) { // 4

  11.        int recheck = ctl.get(); // 5

  12.        if (! isRunning(recheck) && remove(command)) //6

  13.            reject(command);

  14.        else if (workerCountOf(recheck) == 0) // 7

  15.            addWorker(null, false);

  16.    }

  17.    else if (!addWorker(command, false)) //8

  18.        reject(command);

  19. }

接下来按照注释的序号对其一一解释

  1. 提交的任务如果是个空的则抛出 NullPointerException


  2. 获取复合变量(记录了线程池状态和当前线程池线程数量)


  3. 判断当前线程池的线程数量是否在限定的核心线程数量的访问楼内


  4. 如果在那么就直接调用 addWorker添加一个核心线程,然后 return


  5. 添加失败,重新获取复合变量


  6. 判断线程池是否是运行状态并且添加至阻塞等待队列中


  7. 重新获取状态(可能添加的过程中关闭过线程池之类的并发操作)


  8. 判断线程池是否是运行状态,如果不是将添加的任务删除并采取拒绝措施


  9. 判断线程池中的工作线程数量是否为0,如果为空则添加一个工作线程


  10. 队列添加失败,尝试调用 addWorker以非核心线程的方式添加一条非核心线程执行,失败则采用饱和策略拒绝该任务

    从上面的源码可以看到,如果核心线程数量未达到限定范围则会优先创建核心线程来执行该任务,否则将其加入阻塞等待队列中,如果添加至阻塞等待队列中失败后,则尝试创建一个非核心线程来执行该任务如果失败则采用饱和策略,该方法大量都与 addWorker方法相关。


 
   
   
 
  1. private boolean addWorker(Runnable firstTask, boolean core) {

  2.    retry:

  3.    for (;;) { // 1

  4.        int c = ctl.get(); // 2

  5.        int rs = runStateOf(c); // 3

  6.        if (rs >= SHUTDOWN &&

  7.            ! (rs == SHUTDOWN &&

  8.               firstTask == null &&

  9.               ! workQueue.isEmpty())) // 4

  10.            return false;

  11.        for (;;) { // 5

  12.            int wc = workerCountOf(c); // 6

  13.            if (wc >= CAPACITY ||

  14.                wc >= (core ? corePoolSize : maximumPoolSize)) // 7

  15.                return false;

  16.            if (compareAndIncrementWorkerCount(c)) // 8

  17.                break retry;

  18.            c = ctl.get(); // 9

  19.            if (runStateOf(c) != rs) // 10

  20.                continue retry;

  21.        }

  22.    }

  23.    boolean workerStarted = false;

  24.    boolean workerAdded = false;

  25.    Worker w = null;

  26.    try {

  27.        w = new Worker(firstTask); // 11

  28.        final Thread t = w.thread; // 12

  29.        if (t != null) {

  30.            final ReentrantLock mainLock = this.mainLock; // 13

  31.            mainLock.lock(); // 14

  32.            try {

  33.                int rs = runStateOf(ctl.get()); // 15

  34.                if (rs < SHUTDOWN ||

  35.                    (rs == SHUTDOWN && firstTask == null)) { // 16

  36.                    if (t.isAlive()) // 17

  37.                        throw new IllegalThreadStateException();

  38.                    workers.add(w); // 18

  39.                    int s = workers.size(); // 19

  40.                    if (s > largestPoolSize) // 20

  41.                        largestPoolSize = s;

  42.                    workerAdded = true; // 21

  43.                }

  44.            } finally {

  45.                mainLock.unlock(); // 22

  46.            }

  47.            if (workerAdded) {  // 23

  48.                t.start();

  49.                workerStarted = true;

  50.            }

  51.        }

  52.    } finally {

  53.        if (! workerStarted) // 24

  54.            addWorkerFailed(w);

  55.    }

  56.    return workerStarted; // 25

  57. }

以下是对上述代码的分析

  1. 开始自旋

  2. 获取复合状态

  3. 拿到当前线程池运行状态

  4. 判断在必要时检查队列是否为空

  5. 线程池处于 SHUTDOWN时并且有第一个任务时

  6. 当前线程池为 STOP、 TIDYING、 TERMINATED

  7. 当前线程池为 SHUTDOWN时且任务队列为空时

  8. 以上三种情况都会返回 false

  9. 开启第二轮自旋(其实第一轮自旋就只是检测运行状态)

  10. 获取线程数量

  11. 判断当前线程数量是否超出了最大容量限制或判断当前线程数量是否大于核心线程数或者最大线程数,具体判断是判断核心线程数还是最大线程数取决于调用时传入的 core是否为 true,如果超出了直接返回 false

  12. CAS增加当前线程数量,更改成功结束自旋

  13. 重新获取复合状态

  14. 判断当前线程池状态是否还是运行中,如果不是则跳过第一层自旋的第一次自旋开始第二次

  15. 创建工作线程

  16. 获取工作者中的线程对象

  17. 拿到锁变量

  18. 尝试获取锁(在操作队列时采取同步措施)

  19. 获取当前线程池运行状态

  20. 判断线程池是否在运行状态,否则判断是否是 SHUTDOWN状态且传入的任务为空(有时是只启动一个工作线程)

  21. 判断线程是否是 alive状态

  22. 添加工作线程队列

  23. 取当前工作线程队列的数量

  24. 判断是否大于最大线程数量,如果大于则赋值给它

  25. 设置当前工作者加入线程队列的已添加的标识为 true

  26. 释放锁

  27. 判断当前工作线程是否已经加入工作线程队列,如果以加入则启动该工作线程,并设置启动标识为 true

  28. 判断工作线程启动标识如果为 false则调用 addWorkerFailed

  29. 返回启动标识来决定是否添加成功

步骤还挺多的,简单的总结一下,首先自旋的去增加工作者线程的数量,然后创建工作者(工作线程),然后涉及到队列的操作获取到锁然后添加到工作线程队列设置标识,如果未添加到线程队列中,该工作线程也不会启动,如果添加了那么启动该工作线程,然后设置启动标识,最后返回启动标识

 
   
   
 
  1. private void addWorkerFailed(Worker w) {

  2.    final ReentrantLock mainLock = this.mainLock;

  3.    mainLock.lock();

  4.    try {

  5.        if (w != null)

  6.            workers.remove(w);

  7.        decrementWorkerCount();

  8.        tryTerminate();

  9.    } finally {

  10.        mainLock.unlock();

  11.    }

  12. }

addWorkerFailed实际上就是从工作线程队列中移除当前添加失败的工作线程,然后对工作线程数量-1(其实这一步可以综合的说成是回滚操作)

 
   
   
 
  1. public void shutdown() {

  2.    final ReentrantLock mainLock = this.mainLock;

  3.    mainLock.lock();

  4.    try {

  5.        checkShutdownAccess();

  6.        advanceRunState(SHUTDOWN);

  7.        interruptIdleWorkers();

  8.        onShutdown();

  9.    } finally {

  10.        mainLock.unlock();

  11.    }

  12.    tryTerminate();

  13. }

设置复合状态为 SHUTDOWN,对所有工作线程发出中断请求,调用 onShutdown,在该类中没有对该方法做任何操作,该方法是留给子类做扩展用的,类似于 hook函数,最后释放锁,然后调用 tryTerminate尝试将状态改为 TERMINATED

 
   
   
 
  1. public List<Runnable> shutdownNow() {

  2.    List<Runnable> tasks;

  3.    final ReentrantLock mainLock = this.mainLock;

  4.    mainLock.lock();

  5.    try {

  6.        checkShutdownAccess();

  7.        advanceRunState(STOP);

  8.        interruptWorkers();

  9.        tasks = drainQueue();

  10.    } finally {

  11.        mainLock.unlock();

  12.    }

  13.    tryTerminate();

  14.    return tasks;

  15. }

将状态设为 STOP,中断所有工作线程,丢弃所有等待中的队列,最后释放锁,然后调用 tryTerminate尝试将状态改为 TERMINATED

Worker

从上述的源码中可以看出,所有的任务都是放在一个一个的 worker中执行的,那么 Worker究竟是个啥?

 
   
   
 
  1. private final class Worker

  2.    extends AbstractQueuedSynchronizer

  3.    implements Runnable

  4.    {

  5.        .......

  6.    }

它继承至 AQS,实现了 Runnable接口,其利用 AQS实现了一把简单的锁,它是 ThreadPoolExecutor的一个内部类

 
   
   
 
  1. Worker(Runnable firstTask) {

  2.    setState(-1);

  3.    this.firstTask = firstTask;

  4.    this.thread = getThreadFactory().newThread(this);

  5. }

其仅有一个带参的构造函数,其构造函数首先是将状态设为 -1,其利用 AQS的状态大意是

  • -1 还未准备好,禁止中断

  • 0 unlock

  • 1 lock

不熟悉 AQS的可以自行在我博客中找关于 AQS的文章

设置完状态后利用线程工厂创建了一个线程(注意参数,将自身实例传了进去)

看一个线程实例看什么?很简单看 run就完事了

 
   
   
 
  1. public void run() {

  2.    runWorker(this);

  3. }

从源码中可以看出,将行为委托给了 runWorker并将自身实例传递了过去, runWorker是在 ThreadPoolExecutor中定义的,其实这里主要是做的职责分离(不理解这个做法也无所谓完全无伤大雅)

 
   
   
 
  1. final void runWorker(Worker w) {

  2.    Thread wt = Thread.currentThread(); // 1

  3.    Runnable task = w.firstTask; // 2

  4.    w.firstTask = null; // 3

  5.    w.unlock(); // 4

  6.    boolean completedAbruptly = true; // 5

  7.    try {

  8.        while (task != null || (task = getTask()) != null) { // 6

  9.            w.lock(); // 7

  10.            if ((runStateAtLeast(ctl.get(), STOP) || // 8

  11.                 (Thread.interrupted() &&

  12.                  runStateAtLeast(ctl.get(), STOP))) &&

  13.                !wt.isInterrupted())

  14.                wt.interrupt();

  15.            try {

  16.                beforeExecute(wt, task); // 9

  17.                try {

  18.                    task.run();  // 10

  19.                    afterExecute(task, null); // 11

  20.                } catch (Throwable ex) {

  21.                    afterExecute(task, ex);

  22.                    throw ex;

  23.                }

  24.            } finally {

  25.                task = null;

  26.                w.completedTasks++;

  27.                w.unlock(); // 12

  28.            }

  29.        }

  30.        completedAbruptly = false; // 13

  31.    } finally {

  32.        processWorkerExit(w, completedAbruptly); // 14

  33.    }

  34. }

  1. 获取当前线程

  2. 拿到工作线程中的任务

  3. 将工作线程对象中的任务清空

  4. unlock设置锁状态,这里主要还是设置状态为可中断

  5. 设置一个标识,我习惯将这个表示称为“猝死”标识,它主要是标识这个线程是不是正常执行完,是不是意外中断了或者是执行

  6. 自旋判断当前任务是否为空,如果为空,则调用 getTask拿去一个任务

  7. 获取锁

  8. 判断状态是否是 STOP或者被中断了,如果是则发出中断请求

  9. 方法执行之前的一些 Hook函数,说白了该函数啥都没干,留给子类扩展

  10. 运行任务里面的逻辑

  11. 方法执行之后的一些 Hook函数

  12. 解锁

  13. 设置“猝死”标识为 false没有被猝死233333

  14. 调用 processWorkerExit故名思意,其实其背后就是去删除了 workers中的当前退出工作线程对象和修改数量

getTask方法中除了从阻塞队列中拿去一个任务以外,还有一个作用就是维持当前线程活下去

 
   
   
 
  1. private Runnable getTask() {

  2.    boolean timedOut = false; // 1

  3.    for (;;) {

  4.        int c = ctl.get();

  5.        if (runStateAtLeast(c, SHUTDOWN)

  6.            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {

  7.            decrementWorkerCount();

  8.            return null;

  9.        }

  10.        int wc = workerCountOf(c);

  11.        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 2

  12.        if ((wc > maximumPoolSize || (timed && timedOut))

  13.            && (wc > 1 || workQueue.isEmpty())) { // 3

  14.            if (compareAndDecrementWorkerCount(c)) // 4

  15.                return null;

  16.            continue; // 5

  17.        }

  18.        try {

  19.            Runnable r = timed ?

  20.                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  21.                workQueue.take(); // 6

  22.            if (r != null)

  23.                return r;

  24.            timedOut = true;

  25.        } catch (InterruptedException retry) {

  26.            timedOut = false;

  27.        }

  28.    }

  29. }

  1. 超时标志

  2. 定时标志,首先判断是否允许核心线程超时(默认 false)然后判断当前线程池线程数量是否大于核心线程数

  3. 判断当前线程池数是否超过了最大线程数 || 当前线程是否是定时并且已经超时 并且 线程数大于1 或 任务队列为空

  4. 线程数减1,返回 null

  5. 跳过本轮自旋

  6. 从队列中拿取任务

其实所谓的核心线程就是保持它启动后保证在核心线程数内的线程不会挂掉一直在自旋,但如果是设置了 allowCoreThreadTimeOut标志为 true的话那么就意义不大了

Executors

以上线程池分析完了,该工具类是官方提供给我们创建线程池的一些工具集,接着可以看下常用的创建方式的源码

 
   
   
 
  1. public static ExecutorService newFixedThreadPool(int nThreads) {

  2.    return new ThreadPoolExecutor(nThreads, nThreads,

  3.                                  0L, TimeUnit.MILLISECONDS,

  4.                                  new LinkedBlockingQueue<Runnable>());

  5. }

创建最大线程数和核心线程数都为传入的参数的大小,空闲存活时间为0,其使用的队列是无界链表阻塞队列

 
   
   
 
  1. public static ExecutorService newCachedThreadPool() {

  2.    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

  3.                                  60L, TimeUnit.SECONDS,

  4.                                  new SynchronousQueue<Runnable>());

  5. }

创建一个核心线程数为0,但最大线程数为 Integer.MAX_VALUE的线程池,其线程空闲存活时间为60秒,使用同步队列

 
   
   
 
  1. public static ExecutorService newSingleThreadExecutor() {

  2.    return new FinalizableDelegatedExecutorService

  3.        (new ThreadPoolExecutor(1, 1,

  4.                                0L, TimeUnit.MILLISECONDS,

  5.                                new LinkedBlockingQueue<Runnable>()));

  6. }

创建一个核心线程数为1,最大线程数为1的线程池,其线程空闲存活时间为1秒,使用阻塞无界链表队列


推荐: 

上一篇:

关注公众号


以上是关于线程池原理剖析的主要内容,如果未能解决你的问题,请参考以下文章

二期 0005 线程池原理剖析&锁的深度化

线程池原理剖析

线程池核心原理剖析

线程池原理剖析

高效开发:线程池的使用和基本原理剖析

线程池原理剖析