Java中ThreadPool的原理与实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中ThreadPool的原理与实现相关的知识,希望对你有一定的参考价值。

(1)为什么需要ThreadPool?

当我们在使用ThreadPool的时候,首先要明白为什么需要ThreadPool?ThreadPool中到底有些什么?

我们知道进程代表程序的一次执行。进程在创建过程中会加载可执行文件到内存(为了提高执行效率,一般是将可执行文件映射到进程的地址空间,进行lazy load),然后由线程执行可执行文件中的指令。多个进程可以执行同一个可执行文件。所以一个进程至少要有一个线程,一般称之为主线程。进程拥有自己的地址空间,进程中的线程则共享进程的地址空间,所以多个线程可以访问进程中定义的共享变量。这就是在多线程环境下为什么需要互斥保护共享变量的原因。

随着硬件的发展,电脑中的CPU配置也从一个变成了多个,从单核变成了多核。所以可以通过在程序中创建多个线程的方式来充分利用CPU的运算能力。在一个4核CPU上同时执行4个线程,比只执行1个线程对CPU的利用率要高很多。同时,多线程也可以大大提高用户的体验。比如在GUI应用中,主线程主要用来进行图形用户界面的刷新及响应用户的各种操作,而同时还会有多个工作线程负责进行运算或者加载数据。如果只有一个主线程,那么可想而知用户的体验会有多么的差。当从网络上加载数据的时候,因为主线程的阻塞将导致真个程序无法响应用户的任何输入。

既然多个线程的程序已经可以很好的运行,为什么还需要ThreadPool呢?答案很简单:提高执行效率。我们知道,当从程序中调用API创建一个线程的时候,会进行一次系统调用从而从用户态进入内核态。在内核中,操作系统会创建线程的管理单元,设置好线程的执行环境然后再切换到用户态执行线程函数。在线程销毁时,会再一次进入内核态,由操作系统销毁相应的管理单元及相关资源。所以说线程的创建与销毁是一件非常费时的操作。那么可不可以重用创建好的线程从而避免多次创建、销毁?答案就是ThreadPool。所以ThreadPool就是线程的管理器,当我们需要执行一个任务的时候,直接告诉ThreadPool,由ThreadPool选择一个线程来执行。一个thread pool的两个重要组成部分是:线程组和任务队列。

(2)Java中的ThreadPool

下面通过一个一个例子来看一下在Java中如何使用thread pool。首先需要定义一个task以让thread pool执行,代码如下:

public class Task implements Runnable {
    private String name;
    public Task(String name){
        this.name = name;
    }
    @Override
    public void run() {
        Long delay = (long)Math.random() * 10;
        try {
            TimeUnit.SECONDS.sleep(delay);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Task:" + name);
    }
}

接下来,我们就可以使用Executors工具类来创建thread pool并运行上面定义的task。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolTest {
    public static void main(String[] args){
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++){
            executor.execute(new Task("Task:" + i));
        }
        executor.shutdown();
    }
}

上面的代码创建了一个thread pool,代码执行的结果如下:

Task:Task:4
Task:Task:1
Task:Task:0
Task:Task:2
Task:Task:3

执行多次,会得到不同的执行结果。这也充分说明线程的执行顺序是不确定的。如果线程之间存在依赖,需要使用同步机制来保证线程的执行次序。

在Executors类中,定义了几种生成不同thread pool的方法,主要由以下几个:

  • public static ExecutorService newCachedThreadPool() //创建一个线程数动态调整的线程池,注意:线程数量无上限
  • public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) //同上,使用提供的线程工厂创建线程
  • public static ExecutorService newFixedThreadPool(int nThreads) //线程池中线程数量固定为nThreads
  • public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
  • public static ScheduledExecutorService newSingleThreadScheduledExecutor() //创建只包含一个线程的可定时线程池
  • public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) //线程池中至少包含corePoolSize个线程
  • public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

从接口定义可以看出,线程池主要分为两大类:可定时执行线程池(ScheduledExecutorService)和不可定时执行线程池(ExecutorService)。

对于ExecutorService,有两个主要的接口用来执行任务:

  • void execute(Runnable command)
  • <T> Future<T> submit(Callable<T> task)
  • <T> Future<T> submit(Runnable task, T result)
  • Future<?> submit(Runnable task)


execute和submit方法的主要区别是任务是否可控。在execute方法中,一个任务提交之后,只能等待任务完成,要么执行成功,要么失败。而在submit方法中,会返回一个Future对象,通过该对象可以尝试取消该任务及查询任务是否完成。

ScheduledExecutorService继承至ExecutorService,因此除了上面介绍的方法外,有另外几个可以用于定时的方法:

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

(3)线程池的实现

知道了怎么样使用线程池,那么另外一个重要的问题是线程池怎么实现呢?为了搞清楚这个问题,首先看一下线程池相关类之间的关系。

技术分享

ThreadPoolExecutor是ExecutorService的实现类,ScheduledThreadPoolExecutor继承至ThreadPoolExecutor,是ScheduledExecutorService的实现类。我们以ThreadPoolExecutor为例看一下线程池的具体实现。

ThreadPoolExecutor的主要字段由以下几个:

  • private final BlockingQueue<Runnable> workQueue
  • private final ReentrantLock mainLock = new ReentrantLock();
  • private final HashSet<Worker> workers = new HashSet<Worker>(); //管理所有的工作线程
  • private final Condition termination = mainLock.newCondition(); //用于中断线程
  • private volatile ThreadFactory threadFactory; //创建线程的工厂类
  • private volatile long keepAliveTime; //线程空闲回收超时时间
  • private volatile int corePoolSize; //线程池中基本线程数量
  • private volatile int maximumPoolSize; //最大线程数
  1. 线程池的状态

    一个线程池处于下面四中状态之一:

    1. Running: 可以接受新的任务和处理已经在队列中的任务

    2. Shutdown: 不接受新的任务,但会处理已经在队列中的任务

    3. Stop: 不接受新的任务,也不处理队列中的任务,并会中断运行中的任务

    4. Tidying: 所有的任务都已经中止,工作线程数量为0. 在转换到Tidying状态时,会调用terminated()钩子方法

    5. Teriminated: 调用terminated()方法后所处的状态

线程池状态之间的转换如下图:

技术分享

线程池创建后默认是处于Running状态。shutdown()会使线程池进入shutdown状态,而shutdownNow()则会立即进入到stop状态。

线程池的状态在代码中定义如下:

private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

有意思的是在Java线程池的实现中,状态与工作线程的数量放在了同一个字段中,而使用不同的位来表示状态和工作线程的数量。从上面的代码可以看出,状态使用了32位中最高3位保存,而低29位用于记录工作线程的数量。一些辅助的函数如下:

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
任务的提交与执行当通过execute(Runnable command)提交任务时,会通过以下3步进行处理:(a)如果线程池中的线程数量少于corePoolSize,创建一个新的线程,并将当前任务直接赋予新创建的线程进行执行(b)如果线程池的线程数量大于corePoolSize,将尝试将任务放入任务队列(workQueue)中。如果成功放入到任务队列中,将会检查线程池的状态。如果线程池正常运行,但线程池中的线程数量为0,将创建新的线程执行队列中的任务(c)如果线程池的线程数量大于corePoolSize,同时将任务放入任务队列失败时,将尝试创建新的线程执行提交的任务如果任务不能将任务放入任务队列,或者无法添加新的线程处理任务,则会拒绝该任务。默认是丢弃该任务。ThreadPoolExecutor定义了一个内部类Worker来实现对线程的封装private final class Worker     extends AbstractQueuedSynchronizer     implements RunnableWorker中两个主要的字段是:final Thread thread; //当前worker对应的线程Runnable firstTask; //初始任务每当需要创建一个新的线程时,就生成一个新的Worker对象,同时将Worker对象加入到workers集合中。在Worker的构造函数中,通过ThreadFactory创建一个新的线程并与当前worker对象绑定。因为Worker类实现了Runnable接口,所以当线程执行时,将执行Worker.run()方法,进而调用ThreadPoolExecutor.runWorker(Worker w)方法。在runWorker中,将处理firstTask或者从workQueue中取任务进行处理,所以相应的代码在一个循环中。如果没有任务可处理,则会缩减线程池中线程的数量。线程池的终止线程池可以通过shutdown()或者shutdownNow()两个方法终止。shutdownNow()会直接使线程池转换到stop状态。线程池终止时,对于每个worker对象,调用线程的interrupt()终止线程的执行。

(4)总结

  1. 线程池适合大量的异步任务的执行

  2. Executors.newCachedThreadPool()创建的线程池没有线程上限,所以有可能用尽系统中的所有资源

  3. 线程池中两个主要部分是:线程集合管理和任务队列管理

  4. 每个线程的主循环会从任务队列中取任务并处理任务


至此,你是否了解了线程池是什么以及怎么实现的?如果你来写线程池,你会怎样做?

本文出自 “自由飞翔” 博客,请务必保留此出处http://fly2sun.blog.51cto.com/11682229/1786783

以上是关于Java中ThreadPool的原理与实现的主要内容,如果未能解决你的问题,请参考以下文章

可扩/减容线程池C语言原理讲解及代码实现

完全解析线程池ThreadPool原理&使用

java核心知识点 --- 线程池ThreadPool

详述Java线程池实现原理

详述Java线程池实现原理

《Elasticsearch 源码解析与优化实战》第16章:ThreadPool模块分析