Java线程池实现原理和源码分析

Posted 静默加载

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池实现原理和源码分析相关的知识,希望对你有一定的参考价值。

Java线程池实现原理和源码分析


文章目录

前言

本文章是从2019年11月下旬开始打开写的,一直拖到2020年的年尾才开始写,直到2021年年初才写完。

时间太快也太慢~!

依稀记得2019年10月份的时候某东从创业公司离职打算面试找工作,他问我线程池你会么?然后给我他发了一篇我2017年写的笔记《Java并发编程之线程池必用知识点》,他说就这么点?我当时想线程池也差不多就这么多吧~!

2019年11月9号我和某东一起从大望路做815公交去燕郊。当时只是因为我正在学习一部分多线程相关的知识点,刚好公交车上没啥事情我俩就唠了唠。当时他问了我一些线程池的问题,我觉得在平时的工作线程池知道该怎么用就行顶多优化一下核心线程数量。主要讨论的还是多线程并发和锁的相关问题。

年底工作一般比较忙也就很少进行自我学习了,隔了一周想起了某东问我的问题“线程池中线程是怎么产生的,任务是怎么等待执行?”。

自己也不是很清楚这块的逻辑,临时把这个TODO项纪录了下来,想以后有时间了研究一下。结果这个以后跨越了2019年和2020年直接来到了2021年。

原谅我的啰里啰嗦,关键这个篇文章时间跨度太长了,给我的印象太深了。不得不说道说道,下面开始进去正题~!

JDK1.8的源码来分析Java线程池的核心设计与实现。

本文参考了Java线程池实现原理及其在美团业务中的实践这篇文章中的部分内容。

Java线程池实现原理及其在美团业务中的实践这篇文章写的非常好,除过本文内容之外这篇文章还讲述了的关于线程池的背景线程池在业务中的实践动态化线程池等,所以想了解线程池关于这些类容的可以阅读Java线程池实现原理及其在美团业务中的实践这篇文章。

如果读者为做服务端开发的同学那么强烈建议阅读Java线程池实现原理及其在美团业务中的实践

外观

外观主要是我们平常使用线程池的时候所看到的一些点。

  • 继承关系;
  • 构造函数;
  • 构造函数中的参数;
  • 构造函数中的阻塞队列;
  • 线程池的创建;
  • 构造函数中的拒绝策略;

线程池继承关系

ThreadPoolExecutor实现的顶层接口是Executor,在接口Executor中用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力:

  1. 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;
  2. 提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

最下层的实现类ThreadPoolExecutor实现最复杂的运行部分:

  1. 可以自动创建、管理和复用指定数量的一组线程,适用方只需提交任务即可

  2. 线程安全,ThreadPoolExecutor内部有状态、核心线程数、非核心线程等属性,广泛使用了CASAQS锁机制避免并发带来的冲突问题

  3. 提供了核心线程、缓冲阻塞队列、非核心线程、抛弃策略的概念,可以根据实际应用场景进行组合使用

  4. 提供了beforeExecuteafterExecute()可以支持对线程池的功能进行扩展

构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;

  • corePoolSize:线程池的核心线程数,一般情况下不管有没有任务都会一直在线程池中一直存活,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,闲置的核心线程会存在超时机制,如果在指定时间没有新任务来时,核心线程也会被终止,而这个时间间隔由第3个属性 keepAliveTime指定。
  • maximumPoolSize:线程池所能容纳的最大线程数,当活动的线程数达到这个值后,后续的新任务将会被阻塞。
  • keepAliveTime:控制线程闲置时的超时时长,超过则终止该线程。一般情况下用于非核心线程,只有在 ThreadPoolExecutor中的方法allowCoreThreadTimeOut(boolean value)设置为true时,也作用于核心线程。
  • unit:用于指定keepAliveTime参数的时间单位,TimeUnit是个enum枚举类型,常用的有:TimeUnit.HOURS(小时)TimeUnit.MINUTES(分钟)TimeUnit.SECONDS(秒)TimeUnit.MILLISECONDS(毫秒)等。
  • workQueue:线程池的任务队列,通过线程池的execute(Runnable command)方法会将任务Runnable存储在队列中。
  • threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的。
  • handler:拒绝策略,所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。

成员变量

/**
 * 任务阻塞队列 
 */
private final BlockingQueue<Runnable> workQueue; 
/**
 * 非公平的互斥锁(可重入锁)
 */
private final ReentrantLock mainLock = new ReentrantLock();
/**
 * 线程集合一个Worker对应一个线程,没有核心线程的说话,只有核心线程数
 */
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
 * 配合mainLock通过Condition能够更加精细的控制多线程的休眠与唤醒
 */
private final Condition termination = mainLock.newCondition();
/**
 * 线程池中线程数量曾经达到过的最大值。
 */
private int largestPoolSize;  
/**
 * 已完成任务数量
 */
private long completedTaskCount;
/**
 * ThreadFactory对象,用于创建线程。
 */
private volatile ThreadFactory threadFactory;  
/**
 * 拒绝策略的处理句柄
 * 现在默认提供了CallerRunsPolicy、AbortPolicy、DiscardOldestPolicy、DiscardPolicy
 */
private volatile RejectedExecutionHandler handler;
/**
 * 线程池维护线程(超过核心线程数)所允许的空闲时间
 */
private volatile long keepAliveTime;
/**
 * 允许线程池中的核心线程超时进行销毁
 */
private volatile boolean allowCoreThreadTimeOut;  
/**
 * 线程池维护线程的最小数量,哪怕是空闲的  
 */
private volatile int corePoolSize;
/**
 * 线程池维护的最大线程数量,线程数超过这个数量之后新提交的任务就需要进入阻塞队列
 */
private volatile int maximumPoolSize;

创建线程池

Executors提供获取几种常用的线程池的方法:

  • 缓存程线程池

newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

public static ExecutorService newCachedThreadPool() 
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                  60L, TimeUnit.SECONDS,
                  new SynchronousQueue<Runnable>());

  • 单线程线程池

newSingleThreadExecutor 创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

public static ExecutorService newSingleThreadExecutor() 
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()));

  • 固定大小线程池

newFixedThreadPool 创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 
  return new ThreadPoolExecutor(nThreads, nThreads,
                  0L, TimeUnit.MILLISECONDS,
                  new LinkedBlockingQueue<Runnable>(),
                  threadFactory);

  • 单线程线程池

newScheduledThreadPool 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) 
  return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

public ScheduledThreadPoolExecutor(int corePoolSize,
                   ThreadFactory threadFactory) 
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
      new DelayedWorkQueue(), threadFactory);

我们可以看出来上面的方法一共使用了DelayedWorkQueueLinkedBlockingQueueSynchronousQueue。这个就是线程核心之一的阻塞队列。

任务阻塞队列

它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列;

SynchronousQueue

1、直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,每执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。

使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

ArrayBlockingQueue

2、有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

LinkedBlockingQueue

3、无界的任务队列:无界任务队列可以使用LinkedBlockingQueue实现,如下所示:

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

PriorityBlockingQueue

4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现:

任务会按优先级重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。

PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

其实LinkedBlockingQueue也是可以设置界限的,它默认的界限是Integer.MAX_VALUE。同时也支持也支持构造的时候设置队列大小。

拒绝策略

public interface RejectedExecutionHandler 
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

Executor已经关闭,即执行了executorService.shutdown()方法后,或者Executor将有限边界用于最大线程和工作队列容量,且已经饱和时。使用方法execute()提交的新任务将被拒绝.
在以上述情况下,execute方法将调用其RejectedExecutionHandlerRejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)方法。

AbortPolicy 默认的拒绝策略

也称为终止策略,遭到拒绝将抛出运行时RejectedExecutionException。业务方能通过捕获异常及时得到对本次任务提交的结果反馈。

public static class AbortPolicy implements RejectedExecutionHandler 
  public AbortPolicy()  
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
  

CallerRunsPolicy

拥有自主反馈控制,让提交者执行提交任务,能够减缓新任务的提交速度。这种情况是需要让所有的任务都执行完毕。

public static class CallerRunsPolicy implements RejectedExecutionHandler 
    public CallerRunsPolicy()  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
        if (!e.isShutdown()) 
            r.run();
        
    

DiscardPolicy

拒绝任务的处理程序,静默丢弃任务。使用此策略,我们可能无法感知系统的异常状态。慎用~!

public static class DiscardPolicy implements RejectedExecutionHandler 
    public DiscardPolicy()  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
    

DiscardOldestPolicy

丢弃队列中最前面的任务,然后重新提交被拒绝的任务。是否要使用此策略需要看业务是否需要新老的替换,慎用~!

public static class DiscardOldestPolicy implements RejectedExecutionHandler 
    public DiscardOldestPolicy()  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) 
        if (!e.isShutdown()) 
            e.getQueue().poll();
            e.execute(r);
        
    

内核

前面讲了线程池的外观,接下来讲述它的内核

线程池在内部实际上构建了一个生产者消费者模型,将线程任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理、线程管理。

任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:

  1. 直接申请线程执行该任务;
  2. 缓冲到队列中等待线程执行;
  3. 拒绝该任务。

线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

接下来,我们会按照以下三个部分去详细讲解线程池运行机制:

  1. 线程池如何维护自身状态。
  2. 线程池如何管理任务。
  3. 线程池如何管理线程。

线程池的生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。

线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。

在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段.

它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。

用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者的一致,而占用锁资源。通过阅读线程池源代码也可以发现,经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式,相比于基本运算,速度也会快很多(PS:这种用法在许多源代码中都可以看到)。

关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:

private static final int COUNT_BITS = Integer.SIZE - 3;//32-3
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//低29位都为1,高位都为0

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;//111
private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
private static final int STOP       =  1 << COUNT_BITS;//001
private static final int TIDYING    =  2 << COUNT_BITS;//010
private static final int TERMINATED =  3 << COUNT_BITS;//011

// Packing and unpacking ctl
//计算当前运行状态,取高三位
private static int runStateOf(int c)      return c & ~CAPACITY; 
//计算当前线程数量,取低29位
private static int workerCountOf(int c)   return c & CAPACITY; 
//通过状态和线程数生成ctl
private static int ctlOf(int rs, int wc)  return rs | wc; 

ThreadPoolExecutor的运行状态有5种,分别为:

运行状态状态描述
RUNNING能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN不能接受新提交的任务,但却可以继续处理阻塞队列中的任务
STOP不能接受新任务,也不能处理队列中的任务同时会中断正在处理的任务线程
TIDYING所有的任务都已经终止,workCount(有效线程数)为0
TERMINATED在terminated方法执行完之后进入该状态

任务调度机制

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在线程池的运行状态运行线程数运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
  2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

接下来进入源代码分析时间~!

提交任务

//AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;

//ThreadPoolExecutor.java
public void execute(Runnable command) 
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();//获取ctl
  //检查当前核心线程数,是否小于核心线程数的大小限制
  if (workerCountOf(c) < corePoolSize) 
    //没有达到核心线程数的大小限制,那么添家核心线程执行该任务
    if (addWorker(command, true))
      return;
    //如果添加失败,刷新ctl值
    c = ctl.get();
  
  //再次检查线程池的运行状态,将任务添加到等待队列中
  if (isRunning(c) && workQueue.offer(command)) 
    int recheck = ctl.get();//刷新ctl值
    //如果当前线程池的装不是运行状态,那么移除刚才添加的任务
    if (! isRunning(recheck) && remove(command))
      reject(command);//移除成功后,使用拒绝策略处理该任务;
    else if (workerCountOf(recheck) == 0)//当前工作线程数为0
      //线程池正在运行,或者移除任务失败。
      //添加一个非核心线程,并不指定该线程的运行任务。
      //等线程创建完成之后,会从等待队列中获取任务执行。
      addWorker(null, false);
   
  //逻辑到这里说明线程池已经不是RUNNING状态,或者等待队列已满,需要创建一个新的非核心线程执行该任务;
  //如果创建失败,那么非核心线程已满,使用拒绝策略处理该任务;
  else if (!addWorker(command, false))
    reject(command);

添加工作线程和执行任务

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
  Worker(Runnable firstTask) 
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;//初始化的任务,可以为null
    this.thread = getThreadFactory().newThread(this);//Worker持有的线程
  
  /**部分代码省略*/
	public void run() 
  	runWorker(this);
	

添加工作线程和执行任务:总体就是创建Worker,并且为它找到匹配的Runnable

添加工作线程

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。

addWorker方法有两个参数:firstTaskcore

firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;

core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSizefalse表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize

private boolean addWorker(Runnable firstTask, boolean core) 
    retry://breakcontinue的跳出标签
    for (;;) 深入源码分析Java线程池的实现原理

JAVA线程池原理与源码分析

JUC---java线程池原理及源码分析

JUC---java线程池原理及源码分析

深入源码,深度解析Java 线程池的实现原理

通过ThreadPoolExecutor源码分析线程池实现原理