SpringBoot - 优雅的实现异步编程

Posted 小小工匠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot - 优雅的实现异步编程相关的知识,希望对你有一定的参考价值。

文章目录


概述

Spring3开始提供了@Async注解,我们只需要在方法上标注此注解,此方法即可实现异步调用。 除此之外, 还得需要一个配置类,通过@EnableAsync 来开启异步功能 。


V1.0 默认的实现

Step1 搞配置类,开启@EnableAsync

我们需要使用@EnableAsync来开启异步任务支持。

@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。

我们这里选择单独搞个配置类

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig 




Step2 搞方法标记 @Async注解

package com.artisan.jobs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:42
 * @mark: show me the code , change the world
 */

@Component
@Slf4j
public class AsyncJob 


    @Async 
    public void job1() throws InterruptedException 
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job1 cost  ms", endTime - beginTime);
    

    @Async 
    public void job2() throws InterruptedException 
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job2 cost  ms", endTime - beginTime);
    

    


Step3 搞调用

package com.artisan.controller;

import com.artisan.jobs.AsyncJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:44
 * @mark: show me the code , change the world
 */

@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController 


    @Autowired
    private AsyncJob asyncJob;

    @RequestMapping("/job")
    public String task() throws InterruptedException 
        long beginTime = System.currentTimeMillis();

        // 执行异步任务
        asyncJob.job1();
        asyncJob.job2();

        // 模拟业务耗时
        Thread.sleep(1000);


        long cost = System.currentTimeMillis() - beginTime;
        log.info("main cost  ms", cost);

        return "Task Cost " + cost + " ms";
    

    

@Async注解在默认情况下用的是SimpleAsyncTaskExecutor线,不是真正意义上的线程池。



所以,线程名称是 task-1 , task-2, task-3 , task-4…

2022-03-02 22:33:47.007 [http-nio-8080-exec-6] INFO  com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:47.675 [http-nio-8080-exec-2] INFO  com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:48.021 [task-4] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2014 ms
2022-03-02 22:33:48.021 [task-3] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2014 ms
2022-03-02 22:33:48.396 [http-nio-8080-exec-5] INFO  com.artisan.controller.AsyncController:39 - main cost 1015 ms
2022-03-02 22:33:48.678 [task-6] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2004 ms
2022-03-02 22:33:48.678 [task-5] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2004 ms
2022-03-02 22:33:49.004 [http-nio-8080-exec-3] INFO  com.artisan.controller.AsyncController:39 - main cost 1008 ms
2022-03-02 22:33:49.393 [task-8] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2011 ms
2022-03-02 22:33:49.393 [task-7] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2011 ms
2022-03-02 22:33:50.012 [task-9] INFO  com.artisan.jobs.AsyncJob:26 - job1 cost 2015 ms
2022-03-02 22:33:50.012 [task-10] INFO  com.artisan.jobs.AsyncJob:35 - job2 cost 2015 ms

可以看到,每次调用都会new一个线程。若系统中不断的创建线程…


Spring提供的线程池

名称说明
SimpleAsyncTaskExecutor这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地
ConcurrentTaskExecutorExecutor的适配类,不推荐使用。如ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
ThreadPoolTaskScheduler可以使用cron表达式
ThreadPoolTaskExecutor推荐。 是对java.util.concurrent.ThreadPoolExecutor的包装


V2.0 实现@Async的自定义线程池

package com.artisan.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
 * 我们这里选择使用单独的配置类AsyncConfiguration。
 * @date 2022/3/1 0:41
 * @mark: show me the code , change the world
 */

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig 

    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Async-Service-";


    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("customAsyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    




    

其他保持不变, 重启测试


V3.0 多个线程池处理

需求: 不同的业务,使用不同的线程池

多个线程池

package com.artisan.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 小工匠
 * @version 1.0
 * @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
 * 我们这里选择使用单独的配置类ThreadPoolTaskConfig
 * @date 2022/3/1 0:41
 * @mark: show me the code , change the world
 */

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig 

    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 5;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Biz1_Async-Service-";

    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX_2= "Biz2_Async-Service-";

    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("tp1")
    public ThreadPoolTaskExecutor taskExecutor() 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    


    /**
     * 自定义线程池
     *
     * @return
     */
    @Bean("tp2")
    public ThreadPoolTaskExecutor taskExecutor2() 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(CORE_POOL_SIZE);
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        executor.setQueueCapacity(KEEP_ALIVE_TIME);
        executor.setKeepAliveSeconds(QUEUE_CAPACITY);
        executor.setThreadNamePrefix(THREAD_NAME_PREFIX_2);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    


    

配置 多个线程池, 然后 为@Async指定线程池名字即可实现 多个线程池处理

package com.artisan.jobs;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @author 小工匠
 * @version 1.0
 * @description: TODO
 * @date 2022/3/1 0:42
 * @mark: show me the code , change the world
 */

@Component
@Slf4j
public class AsyncJob 


    @Async("tp1")
    public void job1() throws InterruptedException 
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job1 cost  ms", endTime - beginTime);
    

    @Async("tp2")
    public void job2() throws InterruptedException 
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job2 cost  ms", endTime - beginTime);
    

    @Async()
    public void job3() throws InterruptedException 
        long beginTime = System.currentTimeMillis();
        Thread.sleep(2000);
        long endTime = System.currentTimeMillis();
        log.info("job3 cost  ms", endTime - beginTime);
    

    

默认线程池

@Async()没标注,用哪个?????? 当系统存在多个线程池时,我们也可以配置一个默认线程池 ,配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池

package com.artisan.multi;

import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 * <p>
 * 实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池
 */

@Configuration
@EnableAsync
@Slf4j
public class DefaultAsyncConfiguration implements AsyncConfigurer 


    /**
     * 核心线程数(默认线程数)
     */
    private static final int CORE_POOL_SIZE = 2;
    /**
     * 最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
    /**
     * 允许线程空闲时间(单位:默认为秒)
     */
    private static final int KEEP_ALIVE_TIME = 10;
    /**
     * 缓冲队列大小
     */
    private static final int QUEUE_CAPACITY = 200;
    /**
     * 线程池名前缀
     */
    private static final String THREAD_NAME_PREFIX = "Default_Async-Service-";


    @Bean(name = "defaultPool")
    public ThreadPoolTaskExecutor executor() 
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
        taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
        taskExecutor.setQueueCapacity(KEEP_ALIVE_TIME);
        taskExecutor.setKeepAliveSeconds(QUEUE_CAPACITY);
        taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
        /**
         * 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
         * 通常有以下四种策略:
         * ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
         * ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
         * ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
         * ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
         */
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    

    /**
     * 指定默认线程池
     */
    @Override
    public Executor getAsyncExecutor() 
        return executor();
    

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() 
        return (ex, method, params) -> log.error("线程池执行任务发横未知错误,执行方法:", method.getName(), ex);
    





    
以上是关于SpringBoot - 优雅的实现异步编程的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot系列——@Async优雅的异步调用

重学SpringBoot系列之异步任务与定时任务

SpringBoot - 优雅的实现流控

UniRx—简洁优雅的响应式编程框架

Python 工匠:善用变量来改善代码质量

SpringBoot 如何异步编程,老鸟们都这么玩的