ThreadPoolExecutor详解

Posted zhangxufeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor详解相关的知识,希望对你有一定的参考价值。

       ThreadPoolExecutor顾名思义,是一个线程池管理工具类,该类主要提供了任务管理,线程的调度和相关的hook方法来控制线程池的状态。

1.方法说明

任务管理主要方法如下:

public void execute(Runnable command);
public <T> Future<T> submit(Callable<T> task);
public <T> Future<T> submit(Runnable task, T result);
public Future<?> submit(Runnable task);
public void shutdown();
public List<Runnable> shutdownNow();

       上述方法中,execute()和submit()方法在有空闲线程存在的情况下会立即调用该线程执行任务,区别在于execute()方法是忽略任务执行结果的,而submit()方法则可以获取结果。除此之外,ThreadPoolExecutor还提供了shutdown()和shutdownNow()方法用于关闭线程池,区别在于shutdown()方法在调用之后会将任务队列中的任务都执行完毕之后再关闭线程池,而shutdownNow()方法则会直接关闭线程池,并且将任务队列中的任务导出到一个列表中返回。

       除上述用于执行任务的方法外,ThreadPoolExecutor还提供了如下几个hook(钩子)方法:

protected void beforeExecute(Thread t, Runnable r);
protected void afterExecute(Runnable r, Throwable t);
protected void terminated();

       在ThreadPoolExecutor中这几个方法默认都是空方法,beforeExecute()会在每次任务执行之前调用,afterExecute()会在每次任务结束之后调用,terminated()方法则会在线程池被终止时调用。使用这几个方法的方式就是声明一个子类继承ThreadPoolExecutor,并且在子类中重写需要定制的钩子方法,最后在创建线程池时使用该子类实例即可。

2.任务调度

a.相关参数

       对于ThreadPoolExecutor的实例化,其主要有如下几个重要的参数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
                          TimeUnit unit, BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, RejectedExecutionHandler handler);
  • corePoolSize: 线程池核心线程的数量;
  • maximumPoolSize: 线程池可创建的最大线程数量;
  • keepAliveTime: 当线程数量超过了corePoolSize指定的线程数,并且空闲线程空闲的时间达到当前参数指定的时间时该线程就会被销毁,如果调用过allowCoreThreadTimeOut(boolean value)方法允许核心线程过期,那么该策略针对核心线程也是生效的;
  • unit: 指定了keepAliveTime的单位,可以为毫秒,秒,分,小时等;
  • workQueue: 存储未执行的任务的队列;
  • threadFactory: 创建线程的工厂,如果未指定则使用默认的线程工厂;
  • handler: 指定了当任务队列已满,并且没有可用线程执行任务时对新添加的任务的处理策略;
b.调度策略

       当初始化一个线程池之后,池中是没有任何用户执行任务的活跃线程的,当新的任务到来时,根据配置的参数其主要的执行任务如下:

  • 若线程池中线程数小于corePoolSize指定的线程数时,每来一个任务,都会创建一个新的线程执行该任务,无论线程池中是否已有空闲的线程;
  • 若当前执行的任务达到了corePoolSize指定的线程数时,也即所有的核心线程都在执行任务时,此时来的新任务会保存在workQueue指定的任务队列中;
  • 当所有的核心线程都在执行任务,并且任务队列中存满了任务,此时若新来了任务,那么线程池将会创建新线程执行任务;
  • 若所有的线程(maximumPoolSize指定的线程数)都在执行任务,并且任务队列也存满了任务时,对于新添加的任务,其都会使用handler所指定的方式对其进行处理。
c.调度策略注意点
  • 在第二步中,当前核心线程都在执行任务,并且任务队列已满时,会创建新的线程执行任务,这里需要注意的是,创建新线程的时候当前总共需要执行的任务数是(corePoolSize + workQueueSize),并不是只有corePoolSize个任务;
  • 在第三步中,这里workQueue主要有三种类型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue,第一个是有界阻塞队列,第二个是无界阻塞队列,当然也可以为其指定界限大小,第三个是同步队列,对于ArrayBlockingQueue,其是需要指定队列大小的,当队列存满了任务线程池就会创建新的线程执行任务,对于LinkedBlockingQueue,如果其指定界限,那么和ArrayBlockingQueue区别不大,如果其不指定界限,那么其理论上是可以存储无限量的任务的,实际上能够存储Integer.MAX_VALUE个任务(还是相当于可以存储无限量的任务),此时由于LinkedBlockingQueue是永远无法存满任务的,因而maxPoolSize的设定将没有意义,一般其会设定为和corePoolSize相同的值,对于SynchronousQueue,其内部是没有任何结构存储任务的,当一个任务添加到该队列时,当前线程和后续添加任务的线程都会被阻塞,直至有一个线程从该队列中取出任务,当前线程才会被释放,因而如果线程池使用了该队列,那么一般corePoolSize都会设计得比较小,maxPoolSize会设计得比较大,因为该队列比较适合大量并且执行时间较短的任务的执行;
  • 在第四步中,DiscardPolicy和DiscardOldestPolicy一般不会配合SynchronousQueue使用,因为当同步队列阻塞了任务时,该任务都会被抛弃;对于AbortPolicy,因为如果队列已满,那么其会抛出异常,因而使用时需要小心;对于CallerRunsPolicy,由于当有新的任务到达时会使用调用线程执行当前任务,因而使用时需要考虑其对服务器响应的影响,并且还需要注意的是,相对于其他几个策略,该策略不会抛弃任务到达的任务,因为如果到达的任务使队列满了而只能使用调用线程执行任务时,说明线程池设计得不够合理,如果任其发展,那么所有的调用线程都可能会被需要执行的任务所阻塞,导致服务器出现问题。

3.源码讲解

a.主要属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 32
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 00011111 11111111 11111111 11111111

private static final int RUNNING    = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000 00000000 00000000 00000000
private static final int STOP       =  1 << COUNT_BITS; // 00100000 00000000 00000000 00000000
private static final int TIDYING    =  2 << COUNT_BITS; // 01000000 00000000 00000000 00000000
private static final int TERMINATED =  3 << COUNT_BITS; // 01100000 00000000 00000000 00000000

       由于ThreadPoolExecutor需要管理多种状态,并且还要记录当前执行任务的线程的数量,如果使用多个变量,并发更新时管理将会非常复杂,这里ThreadPoolExecutor则主要使用一个AtomicInteger类型的变量ctl存储所有主要的信息。ctl是一个32位的整形数字,初始值为0,其最高的三位用于存储当前线程池的状态信息,主要有RUNNING,SHUTDOWN,STOP,TIDING和TERMINATED,分别表示运行状态,关闭状态,终止状态,整理状态和结束状态。这几种状态对应的具体数值信息如上述代码所示,这里需要注意的一点是,在ThreadPoolExecutor中,这几种状态在数值上是从小到大依次增大的,并且状态流转也是依次往下的,这就为其判断状态信息提供了比较便利的方式,如当需要判断线程池状态是否处于SHUTDOWN状态时,只需要判断其代表状态位部分的值是否等于SHUTDOWN即可。在ctl中,除了最高三位用于表示状态外,其余位所代表的数值则指定了当前线程池中正在执行任务的线程数。如下是操作ctl属性的相关方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
  return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
  return c >= s;
}

private static boolean isRunning(int c) {
  return c < SHUTDOWN;
}

private boolean compareAndIncrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect + 1);
}

private boolean compareAndDecrementWorkerCount(int expect) {
  return ctl.compareAndSet(expect, expect - 1);
}
  • runStateOf(int c): 用于获取当前线程池的状态,c为当前线程池工作时的ctl属性值;
  • workerCountOf(int c): 用于获取当前线程池正在工作的线程数量,c为当前线程池工作时的ctl属性值;
  • ctlOf(int rs, int wc): 这里rs表示当前线程的工作状态,wc则表示正在工作的线程数,该方法用于将这两个参数组装为一个ctl属性值;
  • runStateLessThan(int c, int s): 判断当前线程池状态是否未达到指定状态,如前所述,状态流转在数值上是依次增大的,因而这里只需要判断其大小即可;
  • runStateAtLeast(int c, int s): 用于判断当前线程池状态是否至少处于某种状态;
  • isRunning(int c): 用于判断当前线程池是否处于正常运行状态;
  • compareAndIncrementWorkerCount(int expect): 增加当前线程池的工作线程数量值;
  • compareAndDecrementWorkerCount(int expect): 减少当前线程池的工作线程数量值。
b.主要方法

       对于线程池的execute()和submit()方法,其实在底层submit()方法会将传入的任务封装为一个FutureTask对象,由于FutureTask对象是实现了Runnable接口的,因而其也可以当做一个任务执行,这里就是将封装后的FutureTask对象传递给execute()方法执行的。我们这里则主要讲解execute()方法的实现方式,如下是execute()方法的代码:

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();

  int c = ctl.get();    // 获取当前线程池状态
  if (workerCountOf(c) < corePoolSize) {
    // 当工作线程数小于核心线程数时,则调用addWorker()方法创建线程并执行任务
    if (addWorker(command, true))
      return;
    c = ctl.get();  // 若添加失败,则更新当前线程池状态
  }
  
  // 执行到此处,则说明线程池中的工作线程要么大于等于核心线程数,要么当前线程池已经被命令关闭了(addWorker方法添加失败的原因),因而这里判断线程池是否为RUNNING状态,是则将任务添加到任务队列中
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 添加队列成功后双重验证,确保线程池处于正确状态
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);   // 若线程池中没有线程,则创建一个新线程执行添加的任务
  } else if (!addWorker(command, false))
    reject(command);    // 线程池至少处于SHUTDOWN状态,拒绝当前任务的执行
}

       在execute()方法中,其首先判断线程池工作线程数是否小于核心线程数,是则创建核心线程执行任务,添加失败或者工作线程数大于等于核心线程数时,则将任务添加到任务队列中,添加成功后会进行双重验证确保当前线程池处于正确的状态,并且确保当前有可用的线程执行新添加的任务。由此可见对于execute()方法的实现,其比较核心的方法是addWorker()方法,如下是addWorker()方法的实现方式:

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c); // 获取当前运行状态

    // 判断当前线程池是否至少为SHUTDOWN状态,并且firstTask和任务队列中没有任务,是则直接返回
    if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
      return false;

    for (;;) {
      int wc = workerCountOf(c);
      // 判断是否工作线程数大于可记录的最大线程数,或者工作线程超过了指定的核心线程或者最大线程数
      if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 走到这一步说明当前线程池处于RUNNING状态,或者任务队列存在任务,并且工作线程数不超过
      // 指定的线程数量,那么就增加工作线程数量,成功则继续往下执行,失败则重复上述添加步骤
      if (compareAndIncrementWorkerCount(c))
        break retry;
      c = ctl.get();
      if (runStateOf(c) != rs)
        continue retry;
    }
  }

  // 记录工作线程数的变量已经更新,接下来创建线程执行任务
  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    w = new Worker(firstTask);  // 创建一个工作者对象
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        int rs = runStateOf(ctl.get());

        // 重新检查线程池状态,或者是判断当前是SHUTDOWN状态,而firstTask为空,这说明任务队列此时不为空
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive())
            throw new IllegalThreadStateException();
          workers.add(w);   // 将创建的工作者添加到工作者集合中
          int s = workers.size();
          if (s > largestPoolSize)
            largestPoolSize = s;    // 更新已使用的最大线程数
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        t.start();  // 工作者对象成功创建之后,调用该工作者执行任务
        workerStarted = true;
      }
    }
  } finally {
    if (!workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

       在addWorker()方法中,其首先检查当前线程池是否处于RUNNING状态,或者处于SHUTDOWN状态,但是任务队列中还存在有任务,那么其就会创建一个新的Worker对象,并且将其添加到工作者对象集合中,然后调用工作者对象所维护的线程执行任务,如下是工作者对象的实现代码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  private static final long serialVersionUID = 6138294804551838833L;
  final Thread thread;  // 当前工作者中执行任务的线程
  Runnable firstTask;   // 第一个需要执行的任务
  volatile long completedTasks; // 当前工作者完成的任务数

  Worker(Runnable firstTask) {
    // 默认设置为-1,那么如果不调用当前工作者的run()方法,那么其状态是不会改变的,
    // 其他的线程也无法使用当前工作者执行任务,在run()方法调用的runWorker()方法中会
    // 调用unlock()方法使当前工作者处于正常状态
    setState(-1);
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);   // 使用线程工厂创建线程
  }

  public void run() {
    runWorker(this);    // 使用当前工作者执行任务
  }

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  // 如果当前线程已经在执行任务,那么将其标记为打断状态,待其任务执行完毕则终止任务的执行
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

       在工作者对象中,其主要维护了一个工作者线程,用于执行任务。该工作者对象继承了AbstractQueuedSynchronizer,用于控制当前工作者工作状态的获取,并且其也实现了Runnable接口,将主要任务的执行封装到run()方法中。如下是runWorker()方法的具体实现:

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  w.unlock();   // 重置Worker对象的状态
  boolean completedAbruptly = true;
  try {
    // 首先执行工作者线程中的任务,然后循环从任务队列中获取任务执行
    while (task != null || (task = getTask()) != null) {
      w.lock();
      // 检查当前线程池的状态,如果线程池被终止或者线程池终止并且当前线程已被打断
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
        wt.interrupt();
      try {
        beforeExecute(wt, task);    // 调用钩子方法进行预处理
        Throwable thrown = null;
        try {
          task.run();   // 执行任务
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          afterExecute(task, thrown);   // 调用钩子方法进行任务完成后的处理工作
        }
      } finally {
        task = null;    // 重置工作者的初始任务
        w.completedTasks++;
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    processWorkerExit(w, completedAbruptly);
  }
}

       可以看到,在runWorker()方法中,其首先会执行工作者对象的初始化任务,当执行完毕后会通过一个无限循环不断在任务队列中获取任务执行。如下是getTask()方法的源码:

private Runnable getTask() {
  boolean timedOut = false;

  for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 判断当前线程是否处于STOP状态,或者处于SHUTDOWN状态,并且工作队列是空的,是则不返回任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    int wc = workerCountOf(c);
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;    // 是否允许空闲线程过期

    // 工作线程数大于最大允许线程数,或者线程在指定时间内无法从工作队列中获取到新任务,则销毁当前线程
    if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 允许核心线程过期或者工作线程数大于corePoolSize时,从任务队列获取任务时会指定等待时间,
      // 否则会一直等待任务队列中新的任务
      Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
      if (r != null)
        return r;
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

       可以看到,getTask方法首先会判断当前线程池状态是否为STOP状态,或者是SHUTDOWN状态,并且任务队列是空的,是则不返回任务,否则会根据相关参数从任务队列中获取任务执行。

       以上execute()方法的主要实现步骤,在ThreadPoolExecutor中另一个至关重要的方法则是shutdown()方法,以下是shutdown()方法的主要代码:

public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();  // 检查对线程状态的控制权限
    advanceRunState(SHUTDOWN);  // 更新当前线程池状态为SHUTDOWN
    interruptIdleWorkers(); // 打断空闲的工作者
    onShutdown();   // 钩子方法,但是没有对外公开,因为该方法只有包访问权限
  } finally {
    mainLock.unlock();
  }
  tryTerminate();   
}

       在shutdown()方法中,其首先检查当前线程是否有修改线程状态的权限,然后将当前线程池的状态修改为SHUTDOWN,接着调用interruptIdleWorkers()方法中断所有处于空闲状态的线程,最后则是调用tryTerminate()方法尝试将当前线程池的状态由SHUTDOWN修改为TERMINATED,这里interruptIdleWorkers()方法最终会调用其重载方法interruptIdleWorkers(boolean)方法,该方法代码如下:

private void interruptIdleWorkers(boolean onlyOne) {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    for (Worker w : workers) {
      Thread t = w.thread;
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

       可以看到,该方法会遍历所有的工作者对象,如果其处于空闲状态,则将其终止。对于处于工作状态的线程,由于在shutdown()方法中已经将当前线程池的状态设置为SHUTDOWN,那么工作状态的线程会将任务队列中的任务都执行完毕之后自动销毁。

       本文主要讲解了ThreadPoolExecutor的主要方法,线程池的调度方式,以及其核心功能的实现原理,如本文有任何不当之处,敬请指正,谢谢!

以上是关于ThreadPoolExecutor详解的主要内容,如果未能解决你的问题,请参考以下文章

线程详解之线程池ThreadPoolExecutor

线程详解之线程池ThreadPoolExecutor

线程详解之线程池ThreadPoolExecutor

ThreadPoolExecutor运转机制及BlockingQueue详解

ThreadPoolExecutor详解

ThreadPoolExecutor详解