深入浅出Java线程池

Posted IT-老牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出Java线程池相关的知识,希望对你有一定的参考价值。

课程目标

  • 【理解】线程池基本概念
  • 【理解】线程池工作原理
  • 【掌握】自定义线程池
  • 【应用】java内置线程池
  • 【应用】使用java内置线程池完成综合案例

1.线程池基础

1.1.什么是池?


连接池

1.2.什么是线程池

线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。这里的线程就是我们前面学过的线程,这里的任务就是我们前面学过的实现了RunnableCallable接口的实例对象;

1.3.为什么使用线程池


使用线程池最大的原因就是可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;当然了,使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处;

1.4.程池有哪些优势

1、 线程和任务分离,提升线程重用性;
2、 控制线程并发数量,降低服务器压力,统一管理所有线程;
3、提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;

1.5.线程池应用场景介绍

1、网购商品秒杀
2、云盘文件上传和下载
3、12306网上购票系统等
4、…

只要有并发的地方、任务数量大或小、每个任务执行时间长或短的都可以使用线程池;
只不过在使用线程池的时候,注意一下设置合理的线程池大小即可

2.Java线程池

2.1.java内置线程池

我们要想自定义线程池,必须先了解线程池的工作原理,才能自己定义线程池;这里我们通过观察java中ThreadPoolExecutor的源码来学习线程池的原理;(源码演示在idea中查看)

Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 ExecutorExecutorsExecutorServiceThreadPoolExecutor 这几个类。

ThreadPoolExecutor部分源码

构造方法:

public ThreadPoolExecutor(int corePoolSize, //核心线程数量
                          int maximumPoolSize,//     最大线程数
                          long keepAliveTime, //       最大空闲时间
                          TimeUnit unit,         //        时间单位
                          BlockingQueue<Runnable> workQueue,   //   任务队列
                          ThreadFactory threadFactory,    // 线程工厂
                          RejectedExecutionHandler handler  //  饱和处理机制
	) 

线程池参数说明

  • corePoolSize 线程池的核心线程数;
  • maximumPoolSize 能容纳的最大线程数;
  • keepAliveTime 空闲线程存活时间;
  • unit 存活的时间单位;
  • workQueue 存放提交但未执行任务的队列;
  • threadFactory 创建线程的工厂类;
  • handler 等待队列满后的拒绝策略。

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即初始的线程数。workQueue - 阻塞队列 。 maximumPoolSize -最大线程数。

其中BlockingQueue的实现选择有:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照FIFO的原则对任务进行排序
LinkedBlockingQueue:基于链表的阻塞队列,同样按照FIFO对任务进行排序
SynchronousQueue:一个不存储任何元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直阻塞
PriorityBlockingQueue:具有优先级的阻塞队列

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略

拒绝策略共有四种实现:
AbortPolicy: 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy: 让调用者运行任务
DiscardPolicy: 放弃本次任务
DiscardOldestPolicy :放弃队列中最早的任务,本任务取而代之

2.2.ThreadPoolExecutor参数详解

我们可以通过下面的场景理解ThreadPoolExecutor中的各个参数;

  • a客户(任务)去银行(线程池)办理业务,但银行刚开始营业,窗口服务员还未就位(相当于线程池中初始线程数量为0),于是经理(线程池管理者)就安排1号工作人员(创建1号线程执行任务)接待a客户(创建线程);
  • 在a客户业务还没办完时,b客户(任务)又来了,于是经理(线程池管理者)就安排2号工作人员(创建2号线程执行任务)接待b客户(又创建了一个新的线程);假设该银行总共就2个窗口(核心线程数量是2);
    紧接着在a,b客户都没有结束的情况下c客户来了,于是经理(线程池管理者)就安排c客户先坐到银行大厅的座位上(空位相当于是任务队列)等候,并告知他: 如果1、2号工作人员空出,c就可以前去办理业务;
  • 此时d客户又到了银行,(工作人员都在忙,大厅座位也满了)于是经理赶紧安排临时工(新创建的线程)在大堂站着,手持pad设备给d客户办理业务;
  • 假如前面的业务都没有结束的时候e客户又来了,此时正式工作人员都上了,临时工也上了,座位也满了(临时工加正式员工的总数量就是最大线程数),于是经理只能按《超出银行最大接待能力处理办法》(饱和处理机制)拒接接待e客户;
  • 最后,进来办业务的人少了,大厅的临时工空闲时间也超过了1个小时(最大空闲时间),经理就会让这部分空闲的员工人下班.(销毁线程)
  • 但是为了保证银行银行正常工作(有一个allowCoreThreadTimeout变量控制是否允许销毁核心线程,默认false),即使正式工闲着,也不得提前下班,所以1、2号工作人员继续待着(池内保持核心线程数量);

2.3.线程池工作流程总结示意图

3. 自定义线程池-参数设计分析

通过观察Java中的内置线程池参数讲解和线程池工作流程总结,我们不难发现,要设计一个好的线程池,就必须合理的设置线程池的4个参数;那到底该如何合理的设计4个参数的值呢?我们一起往下看.

3.1.核心线程数(corePoolSize)

核心线程数的设计需要依据任务的处理时间和每秒产生的任务数量来确定,例如:执行一个任务需要0.1秒,系统百分之80的时间每秒都会产生100个任务,那么要想在1秒内处理完这100个任务,就需要10个线程,此时我们就可以设计核心线程数为10;当然实际情况不可能这么平均,所以我们一般按照8020原则设计即可,既按照百分之80的情况设计核心线程数,剩下的百分之20可以利用最大线程数处理;

3.2.任务队列长度(workQueue)

任务队列长度一般设计为:核心线程数/单个任务执行时间*2即可;例如上面的场景中,核心线程数设计为10,单个任务执行时间为0.1秒,则队列长度可以设计为200;

3.3.最大线程数(maximumPoolSize)

大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定:例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么,最大线程数=(最大任务数-任务队列长度)*单个任务执行时间;既: 最大线程数=(1000-200)*0.1=80个;

3.4.最大空闲时间(keepAliveTime)

这个参数的设计完全参考系统运行环境和硬件压力设定,没有固定的参考值,用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;

上面4个参数的设置只是一般的设计原则,并不是固定的,用户也可以根据实际情况灵活调整!

3.5.自定义线程池-实现步骤

1、编写任务类(MyTask),实现Runnable接口;

package com.bruce.demo1;

/*
    需求:
        自定义线程池练习,这是任务类,需要实现Runnable;
        包含任务编号,每一个任务执行时间设计为0.2秒
 */
public class MyTask implements Runnable 

    private int id;
    //由于run方法是重写接口中的方法,因此id这个属性初始化可以利用构造方法完成

    public MyTask(int id) 
        this.id = id;
    

    public void run() 
        String name = Thread.currentThread().getName();
        System.out.println("线程:"+name+" 即将执行任务:"+id);
        try 
            Thread.sleep(200);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        System.out.println("线程:"+name+" 完成了任务:"+id);
    

    @Override
    public String toString() 
        return "MyTask" +
                "id=" + id +
                '';
    


2、编写线程类(MyWorker),用于执行任务,需要持有所有任务;

package com.bruce.demo1;


import java.util.List;

/*
    需求:
        编写一个线程类,需要继承Thread类,设计一个属性,用于保存线程的名字;
        设计一个集合,用于保存所有的任务;
 */
public class MyWorker extends Thread

    private String name;//保存线程的名字
    private List<Runnable> tasks;

    public MyWorker(String name, List<Runnable> tasks) 
        super(name);
        this.tasks = tasks;
    

    @Override
    public void run() 
        //判断集合中是否有任务,只要有,就一直执行任务
        while (tasks.size()>0)
            Runnable r = tasks.remove(0);
            r.run();
        
    


3、编写线程池类(MyThreadPool),包含提交任务,执行任务的能力;

package com.bruce.demo1;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/*
    这是自定义的线程池类;
    成员变量:
        1:任务队列   集合  需要控制线程安全问题
        2:当前线程数量
        3:核心线程数量
        4:最大线程数量
        5:任务队列的长度
    成员方法
        1:提交任务;
            将任务添加到集合中,需要判断是否超出了任务总长度
        2:执行任务;
            判断当前线程的数量,决定创建核心线程还是非核心线程
 */
public class MyThreadPool 
    // 1:任务队列   集合  需要控制线程安全问题
    private List<Runnable> tasks = Collections.synchronizedList(new LinkedList<Runnable>());
    //2:当前线程数量
    private int num;
    //3:核心线程数量
    private int corePoolSize;
    //4:最大线程数量
    private int maxSize;
    //5:任务队列的长度
    private int workSize;

    public MyThreadPool(int corePoolSize, int maxSize, int workSize) 
        this.corePoolSize = corePoolSize;
        this.maxSize = maxSize;
        this.workSize = workSize;
    

    //1:提交任务;
    public void submit(Runnable r)
        //判断当前集合中任务的数量,是否超出了最大任务数量
        if(tasks.size()>=workSize)
            System.out.println("任务:"+r+"被丢弃了...");
        else 
            tasks.add(r);
            //执行任务
            execTask(r);
        
    
    
    //2:执行任务;
    private void execTask(Runnable r) 
        //判断当前线程池中的线程总数量,是否超出了核心数,
        if(num < corePoolSize)
            new MyWorker("核心线程:"+num,tasks).start();
            num++;
        else if(num < maxSize)
            new MyWorker("非核心线程:"+num,tasks).start();
            num++;
        else 
            System.out.println("任务:"+r+" 被缓存了...");
        
    


4、编写测试类(MyTest),创建线程池对象,提交多个任务测试;

package com.bruce.demo1;

/*
    测试类:
        1: 创建线程池类对象;
        2: 提交多个任务
 */
public class MyTest 
    public static void main(String[] args) 
        //1:创建线程池类对象;
        MyThreadPool pool = new MyThreadPool(2,4,20);
        //2: 提交多个任务
        for (int i = 0; i <30 ; i++) 
            //3:创建任务对象,并提交给线程池
            MyTask my = new MyTask(i);
            pool.submit(my);
        
    


关于线程池的功能比较繁多,这里仅仅模拟了核心功能,其他功能大家可以自行思考补全;

4. Java内置线程池

4.1.Java内置线程池-ExecutorService介绍

ExecutorService接口是java内置的线程池接口,通过学习接口中的方法,可以快速的掌握java内置线程池的基本使用
常用方法:

  • void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
  • List<Runnable> shutdownNow() 停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
  • <T> Future<T> submit(Callable<T> task) 执行带返回值的任务,返回一个Future对象。
  • Future<?> submit(Runnable task) 执行 Runnable 任务,并返回一个表示该任务的 Future
  • <T> Future<T> submit(Runnable task, T result) 执行 Runnable 任务,并返回一个表示该任务的 Future

既然ExecutorService是一个接口,接口是无法直接创建对象的,那么我们该如何获取ExecutorService的对象呢?

4.2.Java内置线程池-ExecutorService获取

获取ExecutorService可以利用JDK中的Executors 类中的静态方法,常用获取方式如下:

static ExecutorService newCachedThreadPool() 创建一个默认的线程池对象,里面的线程可重用,且在第一次使用时才创建

static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行;

package com.bruce.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
 *  练习Executors获取ExecutorService,然后调用方法,提交任务;
 */
public class MyTest01 

    public static void main(String[] args) 
        //test1();
        test2();
    

    //练习newCachedThreadPool方法
    private static void test1() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newCachedThreadPool();
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) 
            es.submit(new MyRunnable(i));
        
    

    private static void test2() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() 
            int n=1;
            @Override
            public Thread newThread(Runnable r) 
                return new Thread(r,"自定义的线程名称"+n++);
            
        );

        //2:提交任务;
        for (int i = 1; i <=10 ; i++) 
            es.submit(new MyRunnable(i));
        
    



/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable implements Runnable
    private  int id;
    public MyRunnable(int id) 
        this.id = id;
    
    @Override
    public void run() 
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    

static ExecutorService newFixedThreadPool(int nThreads) 创建一个可重用固定线程数的线程池

static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建。

package com.bruce.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/*
 *练习Executors获取ExecutorService,然后调用方法,提交任务;
 */
public class MyTest02 
    public static void main(String[] args) 
        //test1();
        test2();
    

    //练习方法newFixedThreadPool
    private static void test1() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newFixedThreadPool(3);
        //2:提交任务;
        for (int i = 1; i <= 10; i++) 
            es.submit(new MyRunnable2(i));
        
    

    private static void test2() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newFixedThreadPool(3,new ThreadFactory() 
            int n=1;
            @Override
            public Thread newThread(Runnable r) 
                return new Thread(r,"自定义的线程名称"+n++);
            
        );
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) 
            es.submit(new MyRunnable2(i));
        
    


/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable2 implements Runnable 
    private int id;

    public MyRunnable2(int id) 
        this.id = id;
    

    @Override
    public void run() 
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name + "执行了任务..." + id);
    

static ExecutorService newSingleThreadExecutor()
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。

static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
创建一个使用单个 worker 线程的 Executor,且线程池中的所有线程都使用ThreadFactory来创建。

package com.bruce.demo2;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class MyTest03 
    public static void main(String[] args) 
        //test1();
        test2();
    

    //练习方法newFixedThreadPool
    private static void test1() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor();
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) 
            es.submit(new MyRunnable3(i));
        
    

    private static void test2() 
        //1:使用工厂类获取线程池对象
        ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() 
            int n=1;
            @Override
            public Thread newThread(Runnable r) 
                return new Thread(r,"自定义的线程名称"+n++);
            
        );
        //2:提交任务;
        for (int i = 1; i <=10 ; i++) 
            es.submit(new MyRunnable3(i));
        
    


/*
    任务类,包含一个任务编号,在任务中,打印出是哪一个线程正在执行任务
 */
class MyRunnable3 implements Runnable
    private  int id;
    public MyRunnable3(int id) 
        this.id = id;
    

    @Override
    public void run() 
        //获取线程的名称,打印一句话
        String name = Thread.currentThread().getName();
        System.out.println(name+"执行了任务..."+id);
    

4.3.Java内置线程池-ScheduledExecutorService

ScheduledExecutorServiceExecutorService的子

以上是关于深入浅出Java线程池的主要内容,如果未能解决你的问题,请参考以下文章

深入浅出Java线程池

深入浅出Java线程池

深入浅出JAVA线程池使用原理1

Java深入学习13:Java线程池

多线程——Java线程池原理深入

[Java] Java核心深入理解线程池ThreadPool