线程池的前世今生

Posted 根哥啊

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池的前世今生相关的知识,希望对你有一定的参考价值。

你关注的就是我关心的!

来源:码农的日常记录(ID:think_in_code)


01

线程池出现的背景

我们知道 Thread 的创建、启动以及销毁都是比较耗费系统资源的,比如创建一个线程时,系统需要为该线程创建局部变量表、程序计数器,以及独立的生命周期,过多的线程会占用很多内存,同时过多的线程会导致cpu占用过高,如果线程数量到达一定数目时,cpu仅仅在切换线程上下文就会占很多时间,而不去做别的事情。 我们举一个无限创建线程的例子:

 
   
   
 
  1. public class InfiniteThreadTest {

  2.    public static void main(String[] args) {

  3.        int threadName = 0;

  4.        while (true){

  5.            new Thread (()->{

  6.                try {

  7.                    String[] strArray = new String[1000];

  8.                    System.out.println (Thread.currentThread ().getName ());

  9.                    TimeUnit.MINUTES.sleep (5);

  10.                } catch (InterruptedException e) {

  11.                    e.printStackTrace ();

  12.                }

  13.            },"thread name : "+ threadName++).start ();

  14.        }

  15.    }

  16. }

可以看到,cpu和内存瞬间飙升直到机器卡死~~


02

原理

线程池的出现就是为了避免无限创建线程,同时避免重复创建线程,以节约系统资源,那么应该如何实现呢? 可以把线程池类比为存放线程的池子,当有任务提交给线程池时,池子中的某个线程会主动执行该任务。当池子中的线程都已经使用时,就需要把新提交的任务放入缓存队列中,当池子中出现新的空闲线程时,便从缓存队列中取出任务执行。 因此线程池需要以下要素:

  • 任务队列:缓存提交的任务

  • 线程数量:包括 线程池中的最大线程数量,和最小活跃线程数量

  • 任务拒绝策略: 线程池中数量满时以及任务队列满时,拒绝接受新的线程

提交新任务到线程池流程图:

线程池的前世今生

线程池的实现需要包含以下几个关键成员:

首先 需要实现线程池接口:

 
   
   
 
  1. public interface ThreadPool {


  2.    //提交任务到线程池

  3.    void execute(Runnable runnable);


  4.    //关闭线程池

  5.    void shutdown();


  6. }

接下来我们来实现这个线程池,需要包含生产线程的ThreadFactory、存放线程队列的RunnableQueue、和用来拒绝接受新线程的DenyPolicy

线程池的前世今生

线程池的整体实现:

 
   
   
 
  1. package executorservice;


  2. import java.util.ArrayDeque;

  3. import java.util.Queue;


  4. public class MyThreadPool extends Thread implements ThreadPool {


  5.    // 初始化线程数量

  6.    private final int initSize;


  7.    // 线程池最大线程数量

  8.    private final int maxSize;


  9.    // 线程池核心线程数量

  10.    private final int coreSize;


  11.    // 活跃线程数量

  12.    private int activeCount;


  13.    // 线程工厂

  14.    private ThreadFactory threadFactory;


  15.    // 任务队列

  16.    private final RunnableQueue runnableQueue;


  17.    // 线程池是否已经被关闭

  18.    private volatile boolean isShutdown = false;


  19.    // 工作线程队列

  20.    private final Queue<ThreadTask> threadQueue = new ArrayDeque<> ();


  21.    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.AbortDenyPolicy ();


  22.    // 初始化时设置线程池属性

  23.    public MyThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory) {

  24.        this.initSize = initSize;

  25.        this.maxSize = maxSize;

  26.        this.coreSize = coreSize;

  27.        this.threadFactory = threadFactory;

  28.        this.runnableQueue = new LinkedRunnableQueue ();

  29.        init();

  30.    }


  31.    // 提交任务: 任务放入Runnable queue即可

  32.    @Override

  33.    public void execute(Runnable runnable) {

  34.        if(this.isShutdown){

  35.            throw new IllegalStateException (" The thread pool is destroy ");

  36.        }

  37.        this.runnableQueue.offer (runnable);

  38.    }


  39.    @Override

  40.    public void shutdown() {

  41.        //TODO

  42.    }


  43.    @Override

  44.    public void run() {

  45.        //TODO

  46.    }


  47.    private static class ThreadTask{

  48.        // TODO

  49.    }


  50.    private void init(){

  51.       // TODO

  52.    }


  53. }

execute方法只需要把被Runnable的具体实现放入 线程队列中,等待被线程执行程,那么线程是如何创建的呢?线程数量如何运行已经线程数量又是如何维护的呢?

我们使用简单工厂模式来创建线程,接受之前放入队列中的Ruunnable并生成Thread对象,同时指定线程组和线程名,调用端不用关心线程创建细节。

 
   
   
 
  1. public interface ThreadFactory {


  2.    //创建线程接口

  3.    Thread createThread(Runnable runnable);

  4. }

具体实现如下:

线程池的前世今生

这个线程的具体任务

接下来我们需要把创建好的Thread 交给线程池去运行,其作用就是用来循环获取 等待队列中的runnable对象,然后运行其run()方法。具体如何实现呢?

在线程池初始化时启动一组线程,数量为定义的initSize,每个线程循环获取等待队列中的任务,新增一个线程实现代码:

 
   
   
 
  1.    // 线程池中新增一个运行线程

  2.    private void newThread(){


  3.        // 从等待队列中不停的获取任务 ,并放在这个线程中进行执行

  4.        RunnableTask runnableTask = new RunnableTask (waitThreadQueue);


  5.        Thread thread = this.threadFactory.createThread (runnableTask);

  6.        ThreadTask threadTask = new ThreadTask (thread, runnableTask);

  7.        activeThreadQueue.offer (threadTask);

  8.        this.activeCount++;

  9.        thread.start ();

  10.    }

其中RunnableTask的实现:runnableQueue使用一个阻塞队列,take为空时会阻塞,直到队列中有新的runnable对象

 
   
   
 
  1. public class RunnableTask implements Runnable {


  2.    private final RunnableQueue runnableQueue;


  3.    private volatile boolean running = true;


  4.    public RunnableTask(RunnableQueue runnableQueue) {

  5.        this.runnableQueue = runnableQueue;

  6.    }


  7.    @Override

  8.    public void run() {


  9.        // 循环获取等待队列中的runnable对象,并运行run()方法

  10.        while( running && !Thread.currentThread ().isInterrupted ()) {

  11.            Runnable task = null;

  12.            try {

  13.                task = runnableQueue.take ();

  14.            } catch (InterruptedException e) {

  15.                e.printStackTrace ();

  16.            }

  17.            task.run ();

  18.        }

  19.    }

  20. }

接下来我们在线程池的init()方法中启动数量为initSize的newThread

 
   
   
 
  1. private void init(){

  2.        for (int i = 0; i < initSize; i++) {

  3.            newThread ();

  4.        }

  5.        start ();

  6.    }

start()为 启动线程池本身run(),他对等待队列进行监听,如果活跃数量小于核心数量,且等待队列中数量大于0时,新增一个线程

 
   
   
 
  1. @Override

  2.    public void run() {


  3.        while (!isShutdown && !isInterrupted ()) {


  4.            synchronized (this) {

  5.                // 判断任务队列中是否有等待的队列,同时活跃线程小于定义的核心线程数时,启动新的线程

  6.                if (waitThreadQueue.size () > 0 && activeCount < coreSize) {

  7.                    for (int i = initSize; i < coreSize; i++) {

  8.                        newThread ();

  9.                    }

  10.                }

  11.            }

  12.        }

  13.    }


03

总结

可以看到我们通过几个参数来制定线程池的初始化线程数量,核心线程数量等,并通过两个队列来维护活跃线程和等待任务。 每个活跃线程轮训获取队列中的任务执行。当任务数较多时,如果没有达到我们指定的最大线程数,便增加新的线程,这样避免的无限创造线程造成的资源浪费。

参考:

 https://github.com/stunode/example/tree/master/src/main/java/executorservice

平时使用线程池时需要注意的点:

  • 线程池最大数量需要指定

  • 等待线程队列中的数量需要指定,防止无限接受线程,最后达到OOM

最后,给出一个阿里开发手册中线程池的使用建议:


最近热文阅读:


2、

3、

4、

5、

6、


以上是关于线程池的前世今生的主要内容,如果未能解决你的问题,请参考以下文章

委托的前世今生

多线程的前世今生

线程的前世今生

Redis线程模型的前世今生

Redis 线程模型的前世今生

async&await的前世今生