线程池拒绝策略详解

Posted IT-老牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池拒绝策略详解相关的知识,希望对你有一定的参考价值。

文章目录

1.前言

当线程池已经关闭或达到饱和(最大线程和队列都已满)状态时,新提交的任务将会被拒绝。 ThreadPoolExecutor 定义了四种拒绝策略:

1、AbortPolicy
默认策略,在需要拒绝任务时抛出RejectedExecutionException;
2、CallerRunsPolicy
直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
3、DiscardPolicy
直接丢弃任务;
4、DiscardOldestPolicy
丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃。

我们也可以自定义拒绝策略,只需要实现 RejectedExecutionHandler; 需要注意的是,拒绝策略的运行需要指定线程池和队列的容量。

2.ThreadPoolExecutor创建线程方式

通过下面的demo来了解ThreadPoolExecutor创建线程的过程。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //循环提交任务
            for (int i = 0; i < 8; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println("大家好,我是线程:" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


大家好,我是线程:1
大家好,我是线程:2
大家好,我是线程:3
大家好,我是线程:6
大家好,我是线程:7
大家好,我是线程:8
大家好,我是线程:4
大家好,我是线程:5

执行流程:

  1. 首先通过 ThreadPoolExecutor 构造函数创建线程池;
  2. 执行 for 循环,提交 8 个任务(恰好等于maximumPoolSize[最大线程数] + capacity[队列大小]);
    通过 threadPoolExecutor.submit 提交 Runnable 接口实现的执行任务;
  3. 提交第1个任务时,由于当前线程池中正在执行的任务为 0 ,小于 3(corePoolSize 指定),所以会创建一个线程用来执行提交的任务1;
  4. 提交第 2, 3 个任务的时候,由于当前线程池中正在执行的任务数量小于等于 3 (corePoolSize 指定),所以会为每一个提交的任务创建一个线程来执行任务;
  5. 当提交第4个任务的时候,由于当前正在执行的任务数量为 3 (因为每个线程任务执行时间为10s,所以提交第4个任务的时候,前面3个线程都还在执行中),此时会将第4个任务存放到 workQueue 队列中等待执行;
    由于 workQueue 队列的大小为 2 ,所以该队列中也就只能保存 2 个等待执行的任务,所以第5个任务也会保存到任务队列中;
  6. 当提交第6个任务的时候,因为当前线程池正在执行的任务数量为3,workQueue 队列中存储的任务数量也满了,这时会判断当前线程池中正在执行的任务的数量是否小于6(maximumPoolSize指定);
    如果小于 6 ,那么就会新创建一个线程来执行提交的任务 6;
  7. 执行第7,8个任务的时候,也要判断当前线程池中正在执行的任务数是否小于6(maximumPoolSize指定),如果小于6,那么也会立即新建线程来执行这些提交的任务;
  8. 此时,6个任务都已经提交完毕,那 workQueue 队列中的等待 任务4 和 任务5 什么时候执行呢?
    当任务1执行完毕后(10s后),执行任务1的线程并没有被销毁掉,而是获取 workQueue 中的任务4来执行;
  9. 当任务2执行完毕后,执行任务2的线程也没有被销毁,而是获取 workQueue 中的任务5来执行;

通过上面流程的分析,也就知道了之前案例的输出结果的原因。其实,线程池中会线程执行完毕后,并不会被立刻销毁,线程池中会保留 corePoolSize 数量的线程,当 workQueue 队列中存在任务或者有新提交任务时,那么会通过线程池中已有的线程来执行任务,避免了频繁的线程创建与销毁,而大于 corePoolSize 小于等于 maximumPoolSize 创建的线程,则会在空闲指定时间(keepAliveTime)后进行回收。

3.ThreadPoolExecutor拒绝策略测试

在上面的测试中,我设置的执行线程总数恰好等于maximumPoolSize[最大线程数] + capacity[队列大小],因此没有出现需要执行拒绝策略的情况,因此在这里,我再增加一个线程,提交9个任务,来演示不同的拒绝策略。

3.1.AbortPolicy

功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程
使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.AbortPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println("大家好,我是线程:" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


大家好,我是线程:1
大家好,我是线程:2
大家好,我是线程:3
大家好,我是线程:6
大家好,我是线程:7
大家好,我是线程:8
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7cca494b rejected from java.util.concurrent.ThreadPoolExecutor@7ba4f24f[Running, pool size = 6, active threads = 6, queued tasks = 2, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at com.bruce.demo7.ThreadPoolSerialTest.main(ThreadPoolSerialTest.java:37)
大家好,我是线程:4
大家好,我是线程:5

ThreadPoolExecutor中默认的策略就是AbortPolicyExecutorService接口的系列ThreadPoolExecutor因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。

3.2.CallerRunsPolicy

将被拒绝的任务添加到线程池正在运行线程中去执行

功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。

使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.CallerRunsPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread().getName()+"-->" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


pool-1-thread-1-->1
pool-1-thread-2-->2
pool-1-thread-3-->3
pool-1-thread-4-->6
pool-1-thread-5-->7
pool-1-thread-6-->8
main-->9
pool-1-thread-1-->4
pool-1-thread-2-->5

3.3.DiscardPolicy

丢弃任务,不抛出异常

功能:直接静悄悄的丢弃这个任务,不触发任何动作

使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.DiscardPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread().getName()+"-->" + index);
                    try 
                        //模拟线程执行时间,10s
                        Thread.sleep(10000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                );
                //每个任务提交后休眠500ms再提交下一个任务,用于保证提交顺序
                Thread.sleep(500);
            

         catch (Exception e) 
            e.printStackTrace();
         finally 
        

    


pool-1-thread-1-->1
pool-1-thread-2-->2
pool-1-thread-3-->3
pool-1-thread-4-->6
pool-1-thread-5-->7
pool-1-thread-6-->8
pool-1-thread-1-->4
pool-1-thread-2-->5

3.4.DiscardOldestPolicy

丢弃队列最前面的任务,重新尝试执行任务。

功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行

使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较。

package com.bruce.demo7;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolSerialTest 

    public static void main(String[] args) 
        //核心线程数
        int corePoolSize = 3;
        //最大线程数
        int maximumPoolSize = 6;
        //超过 corePoolSize 线程数量的线程最大空闲时间
        long keepAliveTime = 2;
        //以秒为时间单位
        TimeUnit unit = TimeUnit.SECONDS;
        //创建工作队列,用于存放提交的等待执行任务
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(2);

        ThreadPoolExecutor threadPoolExecutor = null;

        try 
            //创建线程池
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
                    maximumPoolSize,
                    keepAliveTime,
                    unit,
                    workQueue,
                    new ThreadPoolExecutor.DiscardOldestPolicy());

            //循环提交任务
            for (int i = 0; i < 9; i++) 
                //提交任务的索引
                int index = (i + 1);
                threadPoolExecutor.submit(() -> 
                    //线程打印输出
                    System.out.println(Thread.currentThread以上是关于线程池拒绝策略详解的主要内容,如果未能解决你的问题,请参考以下文章

线程池拒绝策略详解

线程池(详解):三大方法七大参数四种拒绝策略

java线程池工作原理及拒绝策略详解

线程池的拒绝策略示例

常见线程池详解(一篇足以)

常见线程池详解(一篇足以)