Java 线程池原理及最佳实践(1.5W字,面试必问)

Posted 码匠笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池原理及最佳实践(1.5W字,面试必问)相关的知识,希望对你有一定的参考价值。

在线程池运行期间改变拒绝任务的策略。

,最终就会执行具体实现RejectedExecutionHandler接口的rejectedExecution(r,executor) 方法了。

    final void reject(Runnable command) 
        handler.rejectedExecution(command, this);
    
3. 线程池结合SpringBoot实战(结合项目)

SpringBoot使用线程池我们常见的有两种方式:

  1. 使用默认的线程池@Async
  2. 使用自定义的线程池

方式一:通过@Async注解调用

第一步:在Application启动类上面加上@EnableAsync

@SpringBootApplication
@EnableAsync
public class ThreadpoolApplication 
    public static void main(String[] args) 
        SpringApplication.run(ThreadpoolApplication.classargs);
    

第二步:在需要异步执行的方法上加上@Async注解

@Service
public class AsyncTest 
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Async
    public void hello(String name)
     //这里使用logger 方便查看执行的线程是什么
        logger.info("异步线程启动 started."+name);  
    

第三步:测试类进行测试验证

    @Autowired
    AsyncTest asyncTest;
    @Test
    void contextLoads() throws InterruptedException 
        asyncTest.hello("afsasfasf");
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        Thread.sleep(1000);
    

查看打印的日志:

INFO 2276 --- [           main] c.h.s.t.t.ThreadpoolApplicationTests     : Started ThreadpoolApplicationTests in 3.003 seconds (JVM running for 5.342)

INFO 2276 --- [         task-1] c.h.s.threadpool.threadpool.AsyncTest    : 异步线程启动 started.afsasfasf

可以清楚的看到新开了一个task-1的线程执行任务。验证成功!!!

***注意:***@Async注解失效常景

方式二:使用自定义的线程池

在默认配置信息里面是没有线程池的拒绝策略设置的方法的,如果需要更换拒绝策略就需要自定义线程池,并且如果项目当中需要多个自定义的线程池,又要如何进行管理呢?

自定义Configuration

第一步:创建一个ThreadPoolConfig 先只配置一个线程池,并设置拒绝策略为CallerRunsPolicy

@Configuration
public class ThreadPoolConfig 

    @Bean("taskExecutor")
    public Executor taskExecutor() 
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    

然后执行之前写的测试代码发现,使用的线程池已经变成自定义的线程池了。

INFO 12740 --- [  myExecutor--2] c.h.s.t.t.ThreadpoolApplicationTests     : threadPoolTaskExecutor 创建线程

INFO 12740 --- [  myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest    : 异步线程启动 started.async注解创建

第二步:如果配置有多个线程池,该如何指定线程池呢?

@Configuration
public class ThreadPoolConfig 

       @Bean("taskExecutor")
    public Executor taskExecutor() 
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    

    @Bean("poolExecutor")
    public Executor poolExecutor() 
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor2--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    

    @Bean("taskPoolExecutor")
    public Executor taskPoolExecutor() 
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数信息
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(200);
        taskExecutor.setKeepAliveSeconds(60);
        taskExecutor.setThreadNamePrefix("myExecutor3--");
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(60);
        //修改拒绝策略为使用当前线程执行
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //初始化线程池
        taskExecutor.initialize();
        return taskExecutor;
    

执行测试类,直接报错说找到多个类,不知道加载哪个类:

No qualifying bean of type \'org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor\' available: expected single matching bean but found 3: taskExecutor,taskPoolExecutor

由于测试类当中是这样自动注入的:

@Autowired
ThreadPoolTaskExecutor threadPoolTaskExecutor; 

考虑到@Autowired 以及@Resource两个注入时的存在多个类如何匹配问题,然后发现只要我们在注入时指定具体的bean就会调用对应的线程池!!!

即修改测试类如下:

    @Autowired
    AsyncTest asyncTest;
    @Autowired
    ThreadPoolTaskExecutor poolExecutor; //会去匹配 @Bean("poolExecutor") 这个线程池
    @Test
    void contextLoads() throws InterruptedException 
        asyncTest.hello("async注解创建");
        //一定要休眠 不然主线程关闭了,子线程还没有启动
        poolExecutor.submit(new Thread(()->
            logger.info("threadPoolTaskExecutor 创建线程");
        ));
        Thread.sleep(1000);
    

最后得到如下信息:

INFO 13636 --- [ myExecutor2--1] c.h.s.t.t.ThreadpoolApplicationTests     : threadPoolTaskExecutor 创建线程 INFO 13636 --- [  myExecutor--1] c.h.s.threadpool.threadpool.AsyncTest    : 异步线程启动 started.async注解创建

注意:如果是使用的@Async注解,只需要在注解里面指定bean的名称就可以切换到对应的线程池去了。如下所示:

 @Async("taskPoolExecutor")
    public void hello(String name)
        logger.info("异步线程启动 started."+name);
    

注意:如果有多个线程池,但是在@Async注解里面没有指定的话,会默认加载第一个配置的线程池

submit和executor区别

execute和submit都属于线程池的方法,execute只能提交Runnable类型的任务,而submit既能提交Runnable类型任务也能提交Callable类型任务。

execute会直接抛出任务执行时的异常,submit会吃掉异常,可通过Future的get方法[会阻塞]将任务执行时的异常重新抛出。

execute所属顶层接口是Executor,submit所属顶层接口是ExecutorService,实现类ThreadPoolExecutor重写了execute方法,抽象类AbstractExecutorService重写了submit方法。

submit和execute由于参数不同有四种实现形式,如下所示,本文主要研究这四种形式在各自使用场景下的区别和联系

这种提交的方式会返回一个Future对象,这个Future对象代表这线程的执行结果
当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。
如果在线程的执行过程中发生了异常,get会获取到异常的信息。
<T> Future<T> submit(Callable<T> task);

当线程正常结束的时候调用Future的get方法会返回result对象,当线程抛出异常的时候会获取到对应的异常的信息。
<T> Future<T> submit(Runnable task, T result);

提交一个Runable接口的对象,这样当调用get方法的时候,如果线程执行成功会直接返回null,如果线程执行异常会返回异常的信息
Future<?> submit(Runnable task);

void execute(Runnable command);

execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常信息。

4. 线程池原理

4.1 主要介绍线程池中线程复用原理

对于线程池的复用原理,可以简单的用一句话概括:创建指定数量的线程并开启,判断当前是否有任务执行,如果有则执行任务。再通俗易懂一些:创建指定数量的线程并运行,重写run方法,循环从任务队列中取Runnable对象,执行Runnable对象的run方法。

image-20210702000757740

接下来开始手写线程池吧,注意是简易线程池,跟JDK自带的线程池无法相提并论,在这里我省略了判断当前线程数有没有大于核心线程数的步骤,简化成直接从队列中取任务,对于理解原理来说已然足矣,代码如下:

public class MyExecutorService 
    /**
     * 一直保持运行的线程
     */

    private List<WorkThread> workThreads;

    /*
     * 任务队列容器
     */

    private BlockingDeque<Runnable> taskRunables;
     /*
     * 线程池当前是否停止
     */

    private volatile boolean isWorking = true;

    public MyExecutorService(int workThreads, int taskRunables) 
        this.workThreads = new ArrayList<>();
        this.taskRunables = new LinkedBlockingDeque<>(taskRunables);
        //直接运行核心线程
        for (int i = 0; i < workThreads; i++) 
            WorkThread workThread = new WorkThread();
            workThread.start();
            this.workThreads.add(workThread);
        
    
    
    /**
     * WorkThread累,线程池的任务类,类比JDK的worker
     */

    class WorkThread extends Thread 
        @Override
        public void run() 
            while (isWorking || taskRunables.size() != 0
                //获取任务
                Runnable task = taskRunables.poll();
                if (task != null
                    task.run();
                
            
        
    
    //执行execute,jdk中会存在各种判断,这里省略了
    public void execute(Runnable runnable) 
        //把任务加入队列
        taskRunables.offer(runnable);
    

    //停止线程池
    public void shutdown() 
        this.isWorking = false;
    

测试

//测试自定义的线程池
public static void main(String[] args) 
    MyExecutorService myExecutorService = new MyExecutorService(36);
    //运行8次
    for (int i = 0; i < 8; i++) 
        myExecutorService.execute(() -> 
            System.out.println(Thread.currentThread().getName() + "task begin");
        );
    
    myExecutorService.shutdown();

总结

通过以上分析并手写线程池,我们应该已经基本理解了线程池的复用机制原理,实际上JDK的实现机制远比我们手写的要复杂的多,主要有以下两点,可以让我们进一步加深理解:

  1. 当有新任务来的时候,首先判断当前的线程数有没有超过核心线程数,如果没超过则直接新建一个线程来执行新的任务,如果超过了则判断缓存队列有没有满,没满则将新任务放进缓存队列中,如果队列已满并且线程池中的线程数已经达到了指定的最大线程数,那就根据相应的策略拒绝任务,默认为抛异常。

  2. 当缓存队列中的任务都执行完毕后,线程池中的线程数如果大于核心线程数并且已经超过了指定的存活时间(存活时间通过队列的poll方法传入,如果指定时间内没有获取到任务,则break退出,线程运行结束),就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时剩余的线程会一直处于阻塞状态,等待新的任务到来。

有兴趣可以看这块的源码,大致的思想就是:

首先,线程池会有一个管理任务的队列,这个任务队列里存放的就是各种任务,线程池会一直不停循环的去查看消息队里有没有接到任务,如果没有,则继续循环,如果有了则开始创建线程,如果给这个线程池设定的容量是10个线程,那么当有任务的时候就会调用创建线程的函数方法去根据当前任务总数量依次创建线程(这里创建线程的函数方法都是你提前些好了的),线程中会写好循环获取任务队列里任务的逻辑、判断是否销毁该线程的逻辑、进入等待的逻辑,这样线程一旦创建出来就会循环的去查询任务队列里的任务,拿到任务后就执行,执行任务完毕后判断是否销毁该线程,如果不销毁就进入等待(sleep),等待时间过后继续查询消息是否有任务,如此循环,直到逻辑判断需要销毁该线程为止(一般都是根据设定时间去判断是否销毁,例如在线程创建的时候设置一个计时器去控制,如果180秒都没有接到新的任务,则销毁该线程) 。

2.5W 字详解线程与锁了,面试随便问!!

在 java 并发编程中,线程和锁永远是最重要的概念。语言规范虽然是规范描述,但是其中也有非常多的知识和最佳实践是值得学习的,相信这篇文章还是可以给很多读者提供学习参考的。 本文主要是翻译 + 解释 Oracle 《The Java Language Specification, Java SE 8

以上是关于Java 线程池原理及最佳实践(1.5W字,面试必问)的主要内容,如果未能解决你的问题,请参考以下文章

2.5W 字详解线程与锁了,面试随便问!!

2.5W 字详解线程与锁了,面试随便问!!

JUC并发编程线程池及相关面试题 详解

面试必会必知:线程池架构浅析

聊聊面试中的 Java 线程池

线程池的设计原理是什么?