多线程学习总结之 线程池

Posted -marksman

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程学习总结之 线程池相关的知识,希望对你有一定的参考价值。

前言:

   本文基于jdk1.8。 前段时间换工作,面试时候每次都会问线程的问题,自己对多线程方面的知识没有花时间研究过,所以一问到线程就懵了,最近特地买了方腾飞老师的《Java并发编程的艺术》这本书学学这方面的知识。这篇随笔主要是我对线程池学习的总结,如有写的不好或不对的地方欢迎指出!

1、线程池的基本概念

  线程池可以理解为一种管理线程的容器,是由我们根据自己的需求创建出来的。使用线程池可以降低系统资源开销、提高响应速度并帮我们管理线程。

2、线程池的主要参数

  int  corePoolSize:核心池大小,线程池正常保持存活的线程数,默认情况下,当我们创建一个线程池,它不会立刻创建线程,而是等到有任务提交时才会创建,当然我们也可以调用线程池的prestartAllCoreThreads()方法,让线程池在创建时就创建corePoolSize数目的线程;

  int  maximuxPoolSize:最大线程池大小,线程池所允许创建的最大线程数;

  long  keepAliveTime:线程存活时间,当线程池中的线程数量大于核心池大小后,多出来的线程在空闲时间达到keepAliveTime后会被中断,如果任务比较多,并且每个任务执行时间短,那可以调大这个参数,以提高线程的利用率;

  TimeUnit  timeUnit:keepAliveTime的单位,值有:DAYS、HOURS、MINUTES、SECONDS、MILLISECONDS(毫秒)、MICROSECONDS(微秒)、NANOSECONDS(纳秒);

  BlockingQueue  workQueue:任务队列,主要实现类有:

    1)、LinkedBlockingQueue:基于链表的无界(最大值为Integer.MAX_VALUE)阻塞队列,按FIFO(先进先出)的规则对任务进行排序,使用了此队列的线程池中maximuxPoolSize和keepAliveTime这两个参数就没有意义了(原因下文解释);

    2)、ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO的规则对任务进行排序,可传入参数来自定义队列大小;

    3)、DelayedWorkQueue:基于堆的延迟队列,静态工厂Executors.newScheduledThreadPool(...)中使用了该队列;

    4)、PriorityBlockingQueue:具有优先级的阻塞队列;

    5)、SynchronousQueue:不存储任务的阻塞队列,每一个插入对应一个取出。

    吞吐量:SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue 

  ThreadFactory  threadFactory:线程工厂,用来创建线程,可以通过线程工厂给新创建的线程设置更合理的名字、设置优先级等;

  RejectedExecutionHandler  handler:拒绝任务的接口处理器;

    拒绝策略有:a、AbortPolicy:拒绝任务并抛出异常,默认的策略;

          b、DiscardPolicy:直接拒绝不抛出异常;

          c、DiscardOldestPolicy:丢弃队列中最远的一个任务(最先进入队列的,FIFO),并执行当前任务;

          d、CallerRunsPolicy:只用调用者所在的线程来执行任务,不管其他线程的事。技术分享图片

          e、当然也可以自定义拒绝策略,来处理如记录日志、持久化等已有拒绝策略不能实现的功能,需实现RejectedExecutionHandler接口,重写rejectedExecution()方法。

3、线程池的工作原理

  线程池通过调用execute()方法工作,当然也可以调用submit()方法,主要区别是submit()方法可以返回任务执行的结果future对象,而execute()没有返回值,execute()方法的源码如下:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn‘t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 1、如果当前线程数小于核心池,则创建线程并执行当前任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 2、如果条件1不满足则将任务放进任务队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) // 如果线程池不处于运行状态,则拒绝
                reject(command);
            else if (workerCountOf(recheck) == 0) // 3、如果线程数没超过最大池数则创建线程并执行任务
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 4、如果任务无法放入队列,则拒绝
            reject(command); // 拒绝
    }

线程池工作原理图:
技术分享图片

以上源码中1、2 、3 注释就对应线程池工作原理图中的1、2、3步的判断。

  这里注意,当线程池中的线程数量小于核心池量,并且这时线程池中还有空闲线程(之前执行任务的线程已经完成工作了),如果这时候有任务提交还是会创建新线程,因为execute()方法中只要当前线程池中线程数量小于核心池就调用addWorker()创建线程执行当前任务,这个似乎有一点不合理,不知 Doug Lea大神以后会不会改进。

  下面举个小例子来和线程池工作原理比较一下:有一个小工厂,最多能容纳20(maximumPoolSize)个工人(线程)干活,目前老板只招了10(corePoolSize)个工人,老板规定不管有没有活都要来上班,活不多时候可以一 部分人干 一部分人歇着,反正都是老员工老板养的起(核心池中一部分线程空闲,但不会被中断),工厂还有个小仓库(任务队列BlockingQueue),有时候活多了干不完,原料(任务)就堆到仓库里,仓库要是堆满了,老板就想办法了,由于老板比较抠门,就招了5个零时工(大于corePoolSize那部分),这批活做的差不多了,老板不想多养几个闲人就辞掉3个零时工(空闲线程达到设定的存活时间,中断),这时又来了一批活,量很大,于是老板又招了8个零时工,这时工厂的工位满了(线程数达到 maximumPoolSize),现在再有活来老板就拒绝了(RejectedExecutionHandler)。

  在介绍线程池参数时有说过如果任务队列是LinkedBlockingQueue,线程池大小和存活时间这两个参数就失效了,这里如果工厂的仓库是无限容量的,老板就不用担心活干不完啦,干不完的活直接扔进仓库就好了,并且老板还可以根据客户要求的期限对任务进行排序,这样就不用再招零时工,自然也没有辞退空闲零时工的事了。

4、常用线程池

    1)、FixedThreadPool   

    固定大小的线程池,它的核心池数和最大线程池数都是传入的参数的值,存活时间为0,即无任务时立即中断,任务队列是 LinkedBlockingQueue。

    优点:可控制并发数量,多出新近的任务会在队列中等待,任务可以无限多。

    缺点:线程池大小固定,随着业务量的变化,改起来不方便,但可以写在配置文件里。

    适用场景:一定数量的任务,执行所需时间长,为了满足管理资源的需求,而需要限制当前线程数量的应用场景,它适用于负载较轻的服务器。

源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
使用示例:
public static void fixedThreadPoolTest(){
    ExecutorService fixdThreadPool = Executors.newFixedThreadPool(3);
    for (int i = 0;i < 10; i++){
        int index = i;
        fixdThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
    }
}
运行结果:
pool-1-thread-2:1
pool-1-thread-3:2
pool-1-thread-1:0
pool-1-thread-3:4
pool-1-thread-2:3
pool-1-thread-3:6
pool-1-thread-1:5
pool-1-thread-3:8
pool-1-thread-2:7
pool-1-thread-1:9

  2)、CachedTheadPool

    可缓存的线程池,核心池为0,最大线程池数为Integer.MAX_VALUE,空闲线程的存活时间60秒,任务队列是SynchronousQueue。

    优点:可根据需要灵活创建线程数量,空闲60秒就中断,节约系统资源。

    缺点:若使用场景不当,如任务很少,偶尔(60秒以上)来一个任务,那就每次都需要创建线程,这样就很消耗系统资源。

    适用场景:适用于执行大量短期异步任务或者负载较轻的服务器。

源码:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
使用示例:
public static void cachedThreadPoolTest(){
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0;i < 10; i++){
        int index = i;
        try {
            Thread.sleep(index*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        cachedThreadPool.execute(() -> System.out.println(Thread.currentThread().getName()+":"+index));
    }
}

运行结果:
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9

  3)、SingleThreadExecutor

    只有一个线程的线程池,核心池和最大线程池大小都是1,空闲线程存活时间是无意义的参数,任务队列是LinkedBlockingQueue。

    优点:线程池中有且只有一个线程一直存在着,任务按顺序执行,后来的任务在队列里排队等待。

    缺点:不适合并发场景。

    适用场景:任务需要按顺序并且无并发的执行。

源码:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
                      0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
使用示例:
public static void singleThreadExecutorTest(){
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    for (int i = 0;i < 10; i++){
        int index = i;
        singleThreadExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index));
    }
}
运行结果:
pool-1-thread-1:0
pool-1-thread-1:1
pool-1-thread-1:2
pool-1-thread-1:3
pool-1-thread-1:4
pool-1-thread-1:5
pool-1-thread-1:6
pool-1-thread-1:7
pool-1-thread-1:8
pool-1-thread-1:9

  4)、ScheduledThreadPool

    可执行定时或周期性任务的线程池,核心池为传入的参数值,最大线程池为Integer.MAX_VALUE,空闲线程存活时间为0,任务队列为DelayedWorkQueue。

    优点:可执行定时和周期性任务,书上说比Timer效果好,有时间测一下。

    缺点:暂时没想到。

    适用场景:有定时、周期性批量任务需求时,如银行批量代收付交易、处理对账、批量放款等。

源码:
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
使用示例:
public static void scheduledThreadPoolTest(){
   ScheduledExecutorService scheduledExecutorPool = Executors.newScheduledThreadPool(3);
     scheduledExecutorPool.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()+
       ":delay 1 seconds,and execute every 3 seconds"),1,3,TimeUnit.SECONDS);
}
运行结果: pool
-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-1:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-2:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds pool-1-thread-3:delay 1 seconds,and execute every 1 seconds

  5)、自定义线程池ThreadLocalExecutor

技术分享图片

    如果上述四种由Executors工厂类提供的常用的线程池满足不了你的业务需求,你可以自定义ThreadPoolExecutor,每个参数都可以按照你的需要设置。

源码:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } 使用示例: public static void threadPoolExecutorTest(){ int corePoolSize = 3, maximumPoolSize = 5; long keepAliveTime = 1; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(1); RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> System.out.println("其实我是拒绝的"); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,unit,workQueue,rejectedExecutionHandler); for (int i = 0; i < 10; i++){ int index = i; threadPoolExecutor.execute(() -> System.out.println(Thread.currentThread().getName() + ":"+index)); } } 运行结果: pool-1-thread-2:1 pool-1-thread-4:4 pool-1-thread-3:2 其实我是拒绝的 pool-1-thread-1:0 pool-1-thread-1:7 pool-1-thread-5:5 pool-1-thread-2:3 其实我是拒绝的 pool-1-thread-4:8

5、合理线程池的参数

  1)、CPU密集型任务(如压缩和解压缩,这种需要CPU不停的计算的任务)应配置尽可能小的线程,如配置CPU个数+1 数量的线程池;

  2)、IO密集型任务,线程并不是一直在执行任务,则应配置尽可能多的线程,如2倍的CPU数;

  3)、混合性任务,如果可以拆分,将器拆分成一个CPU密集性任务和一个IO密集型任务,只要这两个任务执行时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果两个任务执行时间相差很大就没必要进行拆分了;

  4)、优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行;

  5)、执行时间不同的任务可以交给不同规模的线程池来处理,或者可以适用优先级队列,让执行时间段的任务先执行;

  6)、是否依赖其他系统资源,如依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置的越大,这样才能更好的适用CPU;

  7)、尽量使用有界队列,因为有界队列可以增加系统的稳定性和预警能力(无界队列可能会因为任务太多积压在队列里而撑满内存,导致系统瘫痪),可以根据需要将队列设大一点,比如几千。

6、线程池的关闭     

  1)、shutdown() 将线程池的状态置为SHUTDOWN,线程池会将空闲的线程调用它的interrupt()进行中断,还在排队的任务取消,然后等待正在执行任务的线程执行完成后销毁所有线程;

  2)、shutdownNow() 将线程池的状态置为STOP, 然后遍历线程池中所有的线程,并逐个调用它们的interrupt()方法进行中断正在执行任务或者暂停的线程,并返回还在排队的任务列表。

 

参考资料:1、《Java并发编程的艺术》

        2、https://www.cnblogs.com/dolphin0520/p/3932921.html

     3、https://juejin.im/post/5b3cf259e51d45194e0b7204






以上是关于多线程学习总结之 线程池的主要内容,如果未能解决你的问题,请参考以下文章

springboot开发之多线程理解单例研究线程池学习和项目运行问题解决

springboot开发之多线程理解单例研究线程池学习和项目运行问题解决

springboot开发之多线程理解单例研究线程池学习和项目运行问题解决

springboot开发之多线程理解单例研究线程池学习和项目运行问题解决

springboot开发之多线程理解单例研究线程池学习和项目运行问题解决

线程学习知识点总结