线程池——彻底解析

Posted 小王子jvm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池——彻底解析相关的知识,希望对你有一定的参考价值。

基本简介

什么是线程池,为什么使用线程池?

线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了Runnable或Callable接口的实例对象。

使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行压力。当然了使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处。

所以就可以总结出线程池的优势:

  • 线程和任务分离,提升线程重用性
  • 控制线程并发数量,降低服务器压力,统一管理所有线程
  • 提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间

构造方法参数

参数介绍

首先是代码如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

涉及到7个参数:以及设计原则

  • corePoolSize:核心线程数(这个是必须的,创建了就不会被销毁)

    核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统有大概80%的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10。剩下的20%可以交给最大线程数。

  • maximumPoolSize:线程池最大线程数(比核心线程数大,就可以有零时线程,用完过一段时间就销毁)

    最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定。例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)单个任务执行时间。既::最大线程数=(1000-200)*0.1=80个

  • keepAliveTime:线程池中非核心线程空闲的存活时间大小

    这个玩意就是有时候使用到了非核心线程,然后执行完之后,这些非核心线程可以存活的时间。

  • unit:线程空闲存活时间,就是上面存活时间的单位。

  • workQueue:存放任务的阻塞队列

    任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可。例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200

  • threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题

  • Handler:线城池的饱和策略事件,主要有四种类型。

参数的理解:假设一个银行有有两个核心业务员工(就好比是核心线程数),然后有一个经理(非核心线程),这个时候最大线程数为3,来了两个用户ab,此时核心员工为他们服务,然后又来一个c,此时的c就到大厅等待(好比任务队列),然后又来几个人,把大厅坐满了,后面还来,忙不过来了,这个时候经理上线被迫工作。

这可以看出几个等价关系式:maximumPoolSize = corePoolSize + workQueue

工作流程

任务队列

ArrayBlockingQueue

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。也就是我们需要指定这个队列的长度。

LinkedBlockingQueue

LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列。

如果不设置大小,有时候任务执行时间一长,又在不断地产生新的任务,就会造成OOM,内存溢出。

DelayQueue

DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列;

SynchronousQueue

SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

也就是说这个队列如果有任务,插入任务时候会被阻塞,直到队列中的任务被取出消费掉。同理,如果队列中没有任务,线程消费时候,会处于阻塞状态,直到这个队列中有了新的任务。

其他的也没多大新奇。只需要关注这三个就行了。

拒绝策略

当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现RejectedExecutionHandler接口,并实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法。不过Executors框架已经为我们实现了4种拒绝策略:

  • AbortPolicy(抛出一个异常,默认的)

    丢弃任务并抛出RejectedExecutionException异常。

  • DiscardPolicy(直接丢弃任务)

    丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。丢弃的是后面的加入的任务

  • DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)

    丢弃的是任务队列中前面的任务

  • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

    由调用线程处理该任务。也就是哪个线程创建这个线程池,然后线程池就丢给创建它的线程处理。

//使用原生的创建方式,这也是阿里推荐的方式
public void testCreatePool()
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1,2,
            2, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3),
            new ThreadPoolExecutor.AbortPolicy());

自定义线程池

通过模拟来更加了解线程池。

自定义任务类

任务即需要被执行的东西,通常而言都会规定一个任务的接口,然后实现接口,这个接口就是我们需要执行的玩意。这里假设直接使用Runnable接口:(注意,这里是不过是用Runnable接口,并不是说这个玩意是一个线程)

/**
    需求:
        自定义线程池练习,这是任务类,需要实现Runnable;
        包含任务编号,每一个任务执行时间设计为0.2秒
 */
public class MyTask implements Runnable
    private int id;
    //由于run方法是重写接口中的方法,因此id这个属性初始化可以利用构造方法完成

    public MyTask(int id) 
        this.id = id;
    

    @Override
    public void run() 
        String name = Thread.currentThread().getName();
        System.out.println("线程:"+name+" 即将执行任务:"+id);
        try 
            Thread.sleep(200);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println("线程:"+name+" 完成了任务:"+id);
    

    @Override
    public String toString() 
        return "MyTask" +
                "id=" + id +
                '';
    

自定义线程类

既然有了任务,就需要线程需执行:(注意线程是需要从多个任务中获取来执行的)

/**
    需求:
        编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字;
        设计一个集合,用于保存所有的任务;
 */
public class MyWorker extends Thread
    private String name;//保存线程的名字
    private List<Runnable> tasks;
    //利用构造方法,给成员变量赋值

    public MyWorker(String name, List<Runnable> tasks) 
        super(name);
        this.tasks = tasks;
    

    @Override
    public void run() 
        //判断集合中是否有任务,只要有,就一直执行任务
        while (tasks.size()>0)
            Runnable r = tasks.remove(0);
            r.run();	//调用的是run,直接执行方法,而不是启动线程
        
    

自定义线程池

怎么管理任务,线程,就需要一个核心的线程池:

/**
    这是自定义的线程池类;

    成员变量:
        1:任务队列   集合  需要控制线程安全问题
        2:当前线程数量
        3:核心线程数量
        4:最大线程数量
        5:任务队列的长度
    成员方法
        1:提交任务;
            将任务添加到集合中,需要判断是否超出了任务总长度
        2:执行任务;
            判断当前线程的数量,决定创建核心线程还是非核心线程
 */
public class MyThreadPool 
    // 1:任务队列   集合  需要控制线程安全问题
    private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<>());
    //2:当前线程数量
    private int num;
    //3:核心线程数量
    private int corePoolSize;
    //4:最大线程数量
    private int maxSize;
    //5:任务队列的长度
    private int workSize;

    public MyThreadPool(int corePoolSize, int maxSize, int workSize) 
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.workSize = workSize;
    

    //1:提交任务;
    public void submit(Runnable r)
        //判断当前集合中任务的数量,是否超出了最大任务数量
        if(tasks.size()>=workSize)
            System.out.println("任务:"+r+"被丢弃了...");
        else 
            tasks.add(r);
            //执行任务
            execTask(r);
        
    
    //2:执行任务;
    private void execTask(Runnable r) 
        //判断当前线程池中的线程总数量,是否超出了核心数,
        if(num < corePoolSize)
            new MyWorker("核心线程:"+num,tasks).start();
            num++;
        else if(num < maxSize)
            new MyWorker("非核心线程:"+num,tasks).start();
            num++;
        else 
            System.out.println("任务:"+r+" 被缓存了...");
        
    

然后就是测试了,直接创建一个线程池,然后放任务:

public static void main(String[] args) 
    //创建线程池
    MyThreadPool pool = new MyThreadPool(2, 4, 20);
    //创建任务提交并执行
    for (int i = 0; i < 40; i++) 
        pool.submit(new MyTask(i));
    

[ 注意 ]:这里必须是在main方法测试,使用@Test这玩意是体现不出来的

内置线程池-ExecutorService

这几种方式都是需要使用Executors来构建的。

这里只列举几种常见的

通用的方法

  • void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务
public static void main(String[] args) throws InterruptedException 
    //Executors.newCachedThreadPool()获取ExecutorService
    ExecutorService executorService = Executors.newCachedThreadPool((Runnable r) -> 
        int n = 0;
        return new Thread(r,"自定义线程"+n++);
    );
    for (int i = 0; i < 20; i++) 
        Thread.sleep(100);
        executorService.submit(new MyRunnable(i));
    
    executorService.shutdown();
    //会报错,因为调用shutdown方法之后就不能提交新的任务
    //错误类型:java.util.concurrent.RejectedExecutionException
    executorService.submit(new MyRunnable(2));  

  • List<Runnable> shutdownNow() 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表
public static void main(String[] args) throws InterruptedException 
    //Executors.newCachedThreadPool()获取ExecutorService
    ExecutorService executorService = Executors.newCachedThreadPool((Runnable r) -> 
        int n = 0;
        return new Thread(r,"自定义线程"+n++);
    );
    for (int i = 0; i < 20; i++) 
        executorService.submit(new MyRunnable(i));
    
    List<Runnable> runnables = executorService.shutdownNow();

newCachedThreadPool

创建一个默认的线程池对象,里面的线程可重用,且在第一次使用时才创建:

代码如下:(只需要任务提交就会给线程执行)

public class Test 
    public static void main(String[] args) 
        //Executors.newCachedThreadPool()获取ExecutorService
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) 
            executorService.submit(new MyRunnable(i));
        

    

/**
 *  任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable implements Runnable
    private  int id;
    public MyRunnable(int id) 
        this.id = id;
    

    @Override
    public void run() 
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    

源码如下:

public static ExecutorService newCachedThreadPool() 
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

默认创建一个无核心线程的线程池。

当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。极端情况下会创建过多的线程,耗尽 CPU 和内存资源。由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。

构造方法重载:

public class Test 
    public static void main(String[] args) throws InterruptedException 
        //Executors.newCachedThreadPool()获取ExecutorService
        ExecutorService executorService = Executors.newCachedThreadPool((Runnable r) -> 
            int n = 0;
            return new Thread(r,"自定义线程"+n++);
        );
        for (int i = 0; i < 20; i++) 
            Thread.sleep(100);
            executorService.submit(new MyRunnable(i));
        

    

/**
 *  任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable implements Runnable
    private  int id;
    public MyRunnable(int id) 
        this.id = id;
    

    @Override
    public void run() 
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    

线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行。简言之就是可以使用这种方式来自定义执行的线程,之前都是使用默认的。

用于并发执行大量短期的小任务。

newFixedThreadPool

创建一个可重用固定线程数的线程池:

public class Test1 
    public static void main(String[] args) throws InterruptedException 
        //Executors.newCachedThreadPool()获取ExecutorService
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 20; i++) 
            Thread.sleep(100);
            executorService.submit(new MyRunnable(i));
        

    

源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

这种方式的特点:

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为无界队列LinkedBlockingQueue

同样这个可以定义ThreadFactory来自定义创建的线程长啥样:

public static void main(String[] args) throws InterruptedException 
    //Executors.newCachedThreadPool()获取ExecutorService
    ExecutorService executorService = Executors.newFixedThreadPool(3,(Runnable r) -> 
        int n = 0;
        return new Thread(r,"自定义线程"+n++);
    );
    for (int i = 0; i < 20; i++) 
        //Thread.sleep(1);
        executorService.submit(new MyRunnable(i));
    

FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

newSingleThreadExecutor

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

使用方式如下:

public static void main(String[] args) throws InterruptedException 
    //Executors.newCachedThreadPool()获取ExecutorService
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    /*ExecutorService executorService = Executors.newSingleThreadExecutor(3,(Runnable r) -> 
            int n = 0;
            return new Thread(r,"自定义线程"+n++);
        );*/
    for (int i = 0; i < 20; i++) 
        //Thread.sleep(1);
        executorService.submit(new MyRunnable(i));
    

源码如下:

public static ExecutorService newSingleThreadExecutor() 
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));

直接创建一个线程,相当于单线程运行了。

当前的唯一线程,从队列取任务,执行完一个,再继续取,一个人(一条线程)夜以继日地干活。

同理也可以自定义线程创建方式:

public static void main(String[] args) throws InterruptedException 
    //Executors.newCachedThreadPool()获取ExecutorService
    //ExecutorService executorService = Executors.newSingleThreadExecutor();
    ExecutorService executorService = Executors.newSingleThreadExecutor((Runnable r) -> 
        int n = 0;
        return new Thread(r,"自定义线程"+n++);
    );
    for (int i = 0; i < 20; i++) 
        //Thread.sleep(1);
        executorService.submit(new MyRunnable(i))以上是关于线程池——彻底解析的主要内容,如果未能解决你的问题,请参考以下文章

图解 | 你管这破玩意叫线程池?

看起来是线程池的BUG,但是我认为是源码设计不合理

图解 | 你管这破玩意叫线程池?

图解 | 你管这破玩意叫线程池?

面试必备:Java线程池解析

图解 | 你管这破玩意叫线程池?