线程池及Executor框架

Posted yufeng218

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池及Executor框架相关的知识,希望对你有一定的参考价值。

1. 线程的实现方式

讲线程池之前我们先看一下线程的实现方式: Thread,Runnable,Callable

// 实现Runnable接口的类将被Thread执行,表示一个基本的任务
public interface Runnable {
    // run方法就是它所有的内容,就是实际执行的任务
    public abstract void run()?
}
//Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
public interface Callable<V> {
    // 相对于run方法的带有返回值的call方法
    V call() throws Exception?
}

2. 线程池介绍

  在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

2.1 线程池优势

(1)降低资源的消耗。通过重用存在的线程,减少线程创建,消亡的开销,提高性能。

(2)提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

2.2 线程池的实现原理

ThreadPoolExecutor示意 :

技术图片

技术图片

ThreadPoolExecutor执行execute方法分下面4种情况:
(1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
(2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
(3)如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要取全局锁)。
(4)如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

2.3  线程池的创建(ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize,
			  int maximumPoolSize,
			  long keepAliveTime,
			  TimeUnit unit,
			  BlockingQueue<Runnable> workQueue,
			  ThreadFactory threadFactory,
			  RejectedExecutionHandler handler)   
  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
  • maximumPoolSize线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
  • unit:keepAliveTime的单位。
  • workQueue:用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:

    a. ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

    b. LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
      c. SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    d. priorityBlockingQuene:具有优先级的无界阻塞队列;

  • threadFactory:它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
  • handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:

    a.  AbortPolicy:直接抛出异常,默认策略;
    b. CallerRunsPolicy:用调用者所在的线程来执行任务;
    c.  DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    d. DiscardPolicy:直接丢弃任务;
    上面的4种策略都是ThreadPoolExecutor的内部类。当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

public class PolicySample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3,
                5,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());
        Future<String> future = null;
        List<Future<String>> list = new ArrayList<Future<String>>();

        for (int i=0;i<20;i++){
            list.add(pool.submit(new CallTask()));
        }

        if(!list.isEmpty()){
            for (Future<String> f : list){
                System.out.println(f.get());
            }
        }
    }
}

2.4 线程池的5种状态

(1)RUNNING
  a. 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
  b. 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
(2)SHUTDOWN
  a. 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
  b. 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
(3)STOP
  a. 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
  b. 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
(4)TIDYING
  a. 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  b. 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
(5)TERMINATED
  a.  状态说明:线程池彻底终止,就变成TERMINATED状态。
  b. 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
  进入TERMINATED的条件如下:

    I. 线程池不是RUNNING状态;

    II. 线程池状态不是TIDYING状态或TERMINATED状态;

    III. 如果线程池状态是SHUTDOWN并且workerQueue为空;

    IV. workerCount为0;

    V. 设置TIDYING状态成功。

 

 技术图片

3. Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
下图为它的继承与实现

 

技术图片

 从图中可以看出Executor下有一个重要子接口ExecutorService,其中定义了线程池的具体行为:

  • execute(Runnable command):履行Ruannable类型的任务,
  • submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
  • shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
  • shutdownNow():停止所有正在履行的任务并封闭办事。
  • isTerminated():测试是否所有任务都履行完毕了。
  • isShutdown():测试是否该ExecutorService已被关闭。

4. 线程池的源码分析

4.1 线程池的重点属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))?
private static final int COUNT_BITS = Integer.SIZE ­ 3?
private static final int CAPACITY = (1 << COUNT_BITS) ­ 1?

  ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

4.2 ThreadPoolExecutor 线程池的 execute 方法

public void execute(Runnable command) {
  if (command == null)
	  throw new NullPointerException()?
		
  //clt记录着runState和workerCount; 高3位为状态,低29位是线程数
  int c = ctl.get()?
	
  /* 1.workerCountOf方法取出低29位的值,表示当前活动的线程数;
  * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
  * 并把任务添加到该线程中。
  */
  if (workerCountOf(c) < corePoolSize) {
	/* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
	* 如果为true,根据corePoolSize来判断;
	* 如果为false,则根据maximumPoolSize来判断
	*/
	if (addWorker(command, true))
	  return?
	  // 如果添加失败,则重新获取ctl值
	  c = ctl.get()?
  }
	
  //2. 如果当前线程池是运行状态并且任务添加到队列中 
  if (isRunning(c) && workQueue.offer(command)) {
	  // 重新获取ctl值
	  int recheck = ctl.get()?
		
	  // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
	  // 这时需要移除该command,执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
	  if (!isRunning(recheck) && remove(command))
		  reject(command)?
	
	  //获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
	  else if (workerCountOf(recheck) == 0)
		  /*这里传入的参数表示:
		  * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
		  * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为 maximumPoolSize,添加线程时根据maximumPoolSize来判断。
		  */
		  addWorker(null, false)?
  }
  /*
  * 如果执行到这里,有两种情况:
	* 1. 线程池已经不是RUNNING状态;
	* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且 workQueue已满。
	* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
	* 如果失败则拒绝该任务
  */
  else if (!addWorker(command, false))
	  reject(command)?
}

 

  

 

以上是关于线程池及Executor框架的主要内容,如果未能解决你的问题,请参考以下文章

转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码)

Java Executor线程池框架的概述

java多线程之Executor框架线程池详细介绍与ThreadPoolExecutor

Java 并发编程——Executor框架和线程池原理

Java 并发Executor框架机制与线程池配置使用

Java线程池 Executor框架概述