线程池

Posted billxxx

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关的知识,希望对你有一定的参考价值。

一、线程池介绍、作用

线程池可以看做是线程的集合。在没有任务时线程处于空闲状态,当请求到来:线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务(而不是销毁)。这样就实现了线程的重用

我们来看看如果没有使用线程池的情况是这样的:

 

  • 为每个请求都新开一个线程

 

 

public class ThreadPerTaskWebServer {
    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = () -> handleRequest(connection);
            new Thread(task).start();// 为每个请求都创建一个新的线程
        }
    }
    private static void handleRequest(Socket connection) {
        // request-handling logic here
    }
}


为每个请求都开一个新的线程虽然理论上是可以的,但是会有缺点
  • 线程生命周期的开销非常高。每个线程都有自己的生命周期,创建和销毁线程所花费的时间和资源可能比处理客户端的任务花费的时间和资源更多,并且还会有某些空闲线程也会占用资源
  • 程序的稳定性和健壮性会下降,每个请求开一个线程。如果受到了恶意攻击或者请求过多(内存不足),程序很容易就奔溃掉了。

所以说:我们的线程最好是交由线程池来管理,这样可以减少对线程生命周期的管理,一定程度上提高性能。

 

 

 

 

二、JDK提供的线程池API

JDK给我们提供了Excutor框架来使用线程池,它是线程池的基础

  • Executor提供了一种将“任务提交”与“任务执行”分离开来的机制(解耦)

下面我们来看看JDK线程池的总体api架构:

技术图片

接下来我们把这些API都过一遍看看:

 

2.1 Executor接口:

一个对象,执行提交的Runnable任务。 
public interface Executor {
    /**
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
 
 

 

该接口将任务的提交 与 任务如何进行执行(包括线程的使用、调度等) 进行解耦。通常使用 Executor  代替 显式创建线程。
//例如,相比较于使用如下代码
new Thread(new(RunnableTask())).start();
//对每个任务进行执行调用,我们可以使用Executor像下面的方案
Executor executor = getAnExecutor();
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
 
 
Executor接口并没有严格地要求执行是异步的。 
最简单的情况下,executor 可以在 调用者的线程 中立即运行提交的任务,如下所示:
class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
 
典型的操作是任务不在调用者的线程中执行,下面的例子对于每个task 都新建一个线程:
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
 

更多的Executor 实现对 任务如何以及何时执行 设定了一些限制。下面的executor将串行化任务提交到第二个executor,实现类复合executor。

class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final Executor executor; //内部的第二个executor
    Runnable active;

    SerialExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext(); //当前task执行完后,执行队列的下一个task
                }
            }
        });
        if (active == null) {
            scheduleNext();
        }
    }

    protected synchronized void scheduleNext() {
        if ((active = tasks.poll()) != null) {
            executor.execute(active);
        }
    }
}
 
 

ExecutorService ,这是一个更广泛的接口。 ThreadPoolExecutor类提供一个可扩展的线程池实现。 Executors工具类为这些Executor提供了便捷的工厂方法。

在线程T中提交task给executor, 在提交任务之前的action , happen-before 于task的执行,可能在其他的线程;

Actions in a thread prior to submitting a  Runnable object to an Executor happen-before  its execution begins, perhaps in another thread.

 

2.2 ExecutorService 接口

技术图片
该接口提供了一些shutdown 相关的方法,允许用户停止当前Executor中的任务;下面是一个处理网络请求的executor使用方案实现:
class NetworkService implements Runnable {
    private final ServerSocket serverSocket;
    private final ExecutorService pool;
    public NetworkService(int port, int poolSize)
        throws IOException {
        serverSocket = new ServerSocket(port);
        pool = Executors.newFixedThreadPool(poolSize);//使用Executors工具类的工厂方法获取到ExecutorService实例
    }
    public void run() { // run the service
        try {
            for (;;) {  //循环等待请求进入
                pool.execute(new Handler(serverSocket.accept()));
            }
        } catch (IOException ex) {
            pool.shutdown();
        }
    }
}
class Handler implements Runnable {
    private final Socket socket;
    Handler(Socket socket) { this.socket = socket; }
    public void run() {
        // read and service request on socket
    }
}

///////////////////////////////////////////////////////
//下面代码是停止所用任务的实现方案
void shutdownAndAwaitTermination(ExecutorService pool) {
    pool.shutdown(); // Disable new tasks from being submitted
    try {
        // Wait a while for existing tasks to terminate
        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
            pool.shutdownNow(); // Cancel currently executing tasks
            // Wait a while for tasks to respond to being cancelled
            if (!pool.awaitTermination(60, TimeUnit.SECONDS))
                System.err.println("Pool did not terminate");
        }
    } catch (InterruptedException ie) {
        // (Re-)Cancel if current thread also interrupted
        pool.shutdownNow();
        // Preserve interrupt status
        Thread.currentThread().interrupt();
    }
}
 
 
 

2.3 AbstractExecutorService类:

技术图片
该类 提供了ExecutorService接口中的方法的默认实现。 此类实现了submit , invokeAny和invokeAll等方法,这些方法通过newTaskFor方法返回一个RunnableFutureRunnableFuture的默认实现为FutureTask类。 子类通过覆盖newTaskFor方法 来返回自定义的RunnableFuture实现 。

 

2.4 ScheduledExecutorService接口:

技术图片
一个ExecutorService 的衍生定义接口 ,可安排命令以给定的延迟后运行,或定期地执行。
schedule方法创建各种延迟任务,并返回一个可用于取消或检查执行任务的对象。 scheduleAtFixedRate 和 scheduleWithFixedDelay方法创建并执行任务,让他定期运行,直到取消。
命令的提交如果 使用Executor.execute(Runnable)和ExecutorService submit方法,则请求延迟为0 。 零和负的延迟时间(但不是周期)也被允许传入schedule方法,并且被视为立即执行的请求。
所有schedule方法需要 接受相对延迟和周期作为参数,而不是绝对的时间或日期。 这是一个简单的数据变换,把java.util.Date表示的绝对时间转换为所需形式。 例如,要安排在未来某一date 执行任务,可以使用: 
schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
 
 
下面是一个类,设置了一个ScheduledExecutorService,在一小时内每十秒钟发出一次beep。
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    public void beepForAnHour() {
        final Runnable beeper = new Runnable() {
            public void run() { System.out.println("beep"); }
        };
        final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
        scheduler.schedule(new Runnable() {
            public void run() { beeperHandle.cancel(true); }
        }, 60 * 60, SECONDS); //一个小时后取消
    }
}
 
 

 

2.5 ThreadPoolExecutor类:

这个是用的最多的线程池类;详见本文第三部分

 

2.6  ScheduledThreadPoolExecutor类:

相当于 提供了 延迟执行 和  周期执行的  ThreadPoolExecutor类;

 

 

 

 

2.7  ForkJoinPool线程池

除了ScheduledThreadPoolExecutor和ThreadPoolExecutor类线程池以外,还有一个是JDK1.7新增的线程池:ForkJoinPool线程池

 

技术图片
JDK1.7中新增的一个线程池,与ThreadPoolExecutor一样,同样继承了AbstractExecutorService。ForkJoinPool是Fork/Join框架的两大核心类之一。与其它类型的ExecutorService相比,其主要的不同在于采用了工作窃取算法(work-stealing):所有池中线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样很少有线程会处于空闲状态,非常高效。这使得能够有效地处理以下情景:大多数由任务产生大量子任务的情况;从外部客户端大量提交小任务到池中的情况。

 

 

2.8  Callable和Future

Callable就是Runnable的扩展

 

  • Runnable没有返回值,不能抛出受检查的异常,而Callable可以

 

 

技术图片

也就是说:当我们的任务需要返回值的时,我们就可以使用Callable!

Future一般我们认为是Callable的返回值,但他其实代表的是任务的生命周期(当然了,它是能获取得到Callable的返回值的)

技术图片
 
举例代码如下:
public class CallableDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建线程池对象
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 可以执行Runnable对象或者Callable对象代表的线程
        Future<Integer> f1 = pool.submit(new MyCallable(100));
        Future<Integer> f2 = pool.submit(new MyCallable(200));
        // V get()
        Integer i1 = f1.get();
        Integer i2 = f2.get();
        System.out.println(i1 + " " + i2);
        // 结束,关闭线程池
        pool.shutdown();
    }
}
//自定义的 task任务
public class MyCallable implements Callable<Integer> {
    private int number;
    public MyCallable(int number) {
        this.number = number;
    }
    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int x = 1; x <= number; x++) {
            sum += x;
        }
        return sum;
    }
}
 
 

 

 

 

三.  ThreadPoolExecutor详解

根据ThreadPoolExecutor类的头部JavaDoc文档得到信息如下:
  • 使用线程池中的线程完成 提交来的任务, 一般使用Executors类中的工厂方法进行配置线程池;
  • 使用线程池为了解决两个问题: 
    • 解决执行大量异步线程时的消耗问题,提高性能,减少每个任务调用时的开销,
    • 提供了一些方法来 约束和管理 执行任务时消耗的资源,包括线程资源等。
  • 为了可以在各种情况下使用,该类提供了很好的扩展性。但一般情况下,使用Executors提供的以下一些工厂方法以及足够使用了。
    • newCachedThreadPool() ,无限制线程池,带有*自动线程回收
    • newFixedThreadPool(int),  固定大小的线程池
    • newSingleThreadExecutor(),单后台线程
  • 自定义调整该线程池类时,需要了解以下的规则:
    • poolSize: 线程池的大小;  ThreadPoolExecutor 会根据 corePoolSize 和 MaximumPoolSize  调整线程池的大小。
      如果运行的线程小于corePoolSize,则创建新的线程处理请求,即使其他线程是空闲的。
      如果运行的线程数大于corePoolSize 小于 maximumPoolSize, 则仅当队列满是才创建新的线程。
      如果设置的 corePoolSize 和 maximumPoolSize 相等, 则该线程池是一个固定大小的线程池。
      若设置maximumPoolSize为一个实质上的无界value例如Integer.MAX_VALUE,那就是设置线程池为一个无限制的池子。
      corePoolSize 和 maximumPoolSize一般在构造方法中设置,但也可以使用setXXX方法进行设置。
    • 按需构造。默认情况下只有新任务请求过来时才会初始化线程。当然也可以通过重写  prestartCoreThread() 或 prestartAllCoreThreads()等方法来提前初始化线程池中的线程。
    • 创建新线程。  新线程通过 ThreadFactory创建。 默认情况下使用Executors.defaultThreadFactory()获得的线程工厂,创建的线程都是属于同一个线程组、相同优先级、没有守护线程。 允许自定义设置一个 ThreadFactory,对新线程的属性进行改造。 
      如果下次创建失败,会return一个null,执行器会继续执行,但是不会执行任务。
      Threads  should possess the "modifyThread" {@code RuntimePermission}. If  worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not  take effect in a timely manner, and a shutdown pool may remain in a  state in which termination is possible but not completed. (这段文字没看懂什么意思)
    • 线程空闲时间。如果线程数大于corePoolSize,  并且空闲线程中存在某些线程的空闲时间大于keepAliveTime,这些线程将被销毁。
      上面的方案保证了线程池在不被积极使用时 就降低他的资源消耗。
      允许使用setXX方法修改这个生存时间。使用一个无界值比如Long.MAX_VALUE可以保证在线程池关闭前 空闲进程都不会被销毁。
      默认情况下 该超时策略不会作用在coreThread上, 但使用allowCoreThreadTimeOut(boolean)方法可以将该策略也作用到核心thread上,只要keepAliveTime不等于0。
    • 队列配置。任何的 阻塞队列都可以用来管理 提交来的task。 队列的运作机制 和当前的 poolSize有关。
          1. 如果少于corePoolSize个线程正在运行,执行器会添加一个线程来执行任务,而不是把task加入到队列中。
          2. 如果线程数大于等于corePoolSize,则执行器会将task加入到队列中, 而不是创建一个新的线程。
          3. 如果队列满了,则会创建一个新的线程来处理task,除非超过了最大线程数,这样的话就会拒绝这个task。
      三种排队策略: 
          1. 直接移交。该策略不会将任务放入队列,而是直接交给线程执行,如果没有空闲线程则创建。该策略一般适用于无界限线程池的情况。
          2. 无界限队列。 如果所有 coreThread都在忙,则当前提交的任务就会入队等待。因为队列无界,所以不会创建多于corePoolSize数量的线程。该方法适用于每个task之间互相不依赖,so task之间不会影响。 例如在web page server中,这种策略对于短暂的突发请求很有用,但是当任务进入的速度比处理的速度要快时,该策略会造成队列无限制增长。
          3.  有界限队列。 避免了资源耗尽的情况的发生。但该策略更难调整和控制。队列大小和最大线程池大小可能会相互折衷:(1)使用大队列和小线程池可以最大程度地减少 CPU使用率、操作系统资源和上下文切换开销,但会导致人为地降低吞吐量。如果任务经常阻塞(例如它们是I/O绑定),则系统应该可以为线程安排比您原本允许的时间更多的时间。(2)使用小队列*通常需要更大的池大小,这会使CPU繁忙,但是*可能会遇到不可接受的调度开销,这也会*降低吞吐量。
    • 任务的拒绝 。 当出现 线程池关闭、 或者  线程数量满了且队列饱和,  以上两种情况都会导致任务拒绝:有四种拒绝策略:
          1. 默认策略, 直接抛出异常, RejectedExecutionException。
          2. 使用调用者所在的线程 来执行任务。
          3.  直接丢掉这个任务。
          4. 丢弃掉队列中最老的那个任务。
    • Hook 方法。提供protected的可重写方法 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) ,在每个任务执行之前和之后被调用。这些可用于操纵执行的环境。例如,重新初始化 ThreadLocals,收集统计信息或添加日志条目。*另外,方法 terminated()可以被覆盖以执行Executor 完全终止后需要执行的任何特殊处理。
      如果钩子或回调方法引发异常,则内部worker *线程可能进而失败并突然终止。
    • 线程池终止。一个线程池在程序中不再继续有引用 ,并且没有剩余的存活线程,将自动 shutdown 。如果希望确保即使在用户忘记调用shutdown() 的情况下也能回收未引用的池,则必须通过设置适当的 keep-alive时间来让不再使用的线程最终都死亡。设置核心线程下限为0 和/或  设置 allowCoreThreadTimeOut(boolean)。
 
 

3.1内部状态

技术图片
变量ctl定义为AtomicInteger,记录了“线程池中的任务数量”和“线程池的状态”两个信息
技术图片

线程的状态:

  • RUNNING:线程池能够接受新任务,以及对新添加的任务进行处理。
  • SHUTDOWN:线程池不可以接受新任务,但是可以对已添加的任务进行处理。
  • STOP:线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
  • TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • TERMINATED:线程池彻底终止的状态
技术图片
各个状态之间转换:
技术图片
 
 

3.2  构造方法

构造方法可以让我们自定义(扩展)线程池。
/**
 * @param corePoolSize :保留在池中的线程数,即使线程处于空闲状态也不回收,除非设置allowCoreThreadTimeOut属性为true
 * @param maximumPoolSize:线程池中允许的最大线程数 
 * @param keepAliveTime:当线程数大于corePoolSize 时,这是多余的空闲线程 在终止之前 等待新任务的最长时间。
 * @param unit : keepAliveTime参数的时间单位
 * @param workQueue 在任务执行之前用于保留任务的队列。该队列仅保存由execute方法提交的Runnable任务。
 * @param threadFactory: executor 创建新线程时要使用的线程工厂
 * @param handler : 当线程数达到了线程上限,并且队列已经满了, 此时提交的execution被阻塞。handler就是发生该情况时要使用的处理程序。
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException(); //参数不合法
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException(); //参数存在空指针异常
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
 
 

线程数量要点

  • 如果运行线程的数量少于核心线程数量,则创建新的线程处理请求
  • 如果运行线程的数量大于核心线程数量,小于最大线程数量,则当队列满的时候才创建新的线程
  • 如果核心线程数量等于最大线程数量,那么将创建固定大小的连接池
  • 如果设置了最大线程数量为无穷,那么允许线程池适合任意的并发数量

线程空闲时间要点:

  • 当前线程数大于核心线程数,如果空闲时间已经超过了,那该线程会销毁

排队策略要点

  • 同步移交:不会放到队列中,而是等待线程执行它。如果当前线程没有执行,很可能会新开一个线程执行。
  • 无界限策略:如果核心线程都在工作,该线程会放到队列中。所以线程数不会超过核心线程数
  • 有界限策略:可以避免资源耗尽,但是一定程度上减低了吞吐量

当线程关闭或者线程数量满了和队列饱和了,就有拒绝任务的情况了:

拒绝任务策略:

  • 直接抛出异常
  • 使用调用者的线程来处理
  • 直接丢掉这个任务
  • 丢掉最老的任务
 

3.3  工厂方法获取默认实现的池

下面我就列举三个比较常见的实现池:

  • newFixedThreadPool(int)
  • newCachedThreadPool()
  • SingleThreadExecutor()

如果读懂了上面对应的策略,线程数量这些,应该就不会太难看懂了。

3.2.1 newFixedThreadPool

使用Executors工具类中的newFixedThreadPool()方法,它将返回一个corePoolSize和maximumPoolSize相等的线程池

//@param nThreads: the number of threads in the pool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
} 
 

3.2.2newCachedThreadPool

非常有弹性的线程池,对于新的任务,如果此时线程池里没有空闲线程,线程池会毫不犹豫的创建一条新的线程去处理这个任务

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
 
 

3.2.3SingleThreadExecutor

使用单个worker线程的Executor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
 
 
 

四、execute执行方法

execute执行方法分了三步,以注释的方式写在代码上了~

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //如果线程池中运行的线程数量<corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true)) //创建新进程执行
            return;
        c = ctl.get();
    }
    //如果线程池中运行的线程数量>=corePoolSize,
    if (isRunning(c) && workQueue.offer(command)) { //线程池处于RUNNING状态。把提交的任务成功放入阻塞队列中。
        int recheck = ctl.get(); //就再次检查线程池的状态,因为现有的线程自上次检查后可能死亡
        if (! isRunning(recheck) && remove(command)) //if线程池不是RUNNING状态,且成功从阻塞队列中删除任务,
            reject(command);  //则该任务由当前 RejectedExecutionHandler 处理。
        else if (workerCountOf(recheck) == 0) //否则如果线程池中运行的线程数量为0,
            addWorker(null, false); //通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
    }
    // 如果以上两种case不成立,即没能将任务成功放入阻塞队列中,且addWoker新建线程失败,则该任务由当前 RejectedExecutionHandler 处理。
    else if (!addWorker(command, false))
        reject(command);
}
 
 
 addWorker(Runnable firstTask, boolean core) 方法:
  • 检查是否可以针对当前池状态和给定的界限(coreSize或maxSize)添加新的worker。如果可以,则将调整worker计数,并在可能的情况下创建并启动一个新的worker,并将firstTask作为其第一个任务运行。如果池已停止或可以关闭,则此方法返回false。如果线程工厂无法创建新线程,它也会返回false。如果线程创建失败(由于线程工厂返回null或由于异常(通常是Thread.start() 中的OutOfMemoryError)),我们将进行干净的回滚。
  • 参数:firstTask: 新线程应首先运行的任务(如果没有,则为null)。当初始线程的数量少于corePoolSize线程(在这种情况下,我们总是启动一个线程),或队列已满时(在这种情况下,我们必须绕过队列机制),我们需要绕过排队。使用初始的第一个任务创建worker以绕过排队。最初,空闲线程通常是通过prestartCoreThread创建的,或者用于替换其他垂死的工作线程。
  • 参数 core : 如果为true,请使用corePoolSize作为上限,否则是 maximumPoolSize。 
 

五、线程池关闭

ThreadPoolExecutor提供了shutdown()shutdownNow()两个方法来关闭线程池

shutdown() :

技术图片
 

shutdownNow():

技术图片
 

区别:

 

  • 调用shutdown()后,线程池状态立刻变为SHUTDOWN,而调用shutdownNow(),线程池状态立刻变为STOP
  • shutdown()等待任务执行完才中断线程,而shutdownNow()不等任务执行完就中断了线程。

 

 

 


TODO:

ScheduledThreadPoolExecutor
ForkJoinPool

 

 

 

 

 

 

以上是关于线程池的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Motan在服务provider端用于处理request的线程池

Java线程池详解

Java线程池详解

Java 线程池详解

线程池-实现一个取消选项