SpringBoot - 优雅的实现异步编程
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot - 优雅的实现异步编程相关的知识,希望对你有一定的参考价值。
文章目录
概述
Spring3开始提供了@Async注解,我们只需要在方法上标注此注解,此方法即可实现异步调用。 除此之外, 还得需要一个配置类,通过@EnableAsync 来开启异步功能 。
V1.0 默认的实现
Step1 搞配置类,开启@EnableAsync
我们需要使用@EnableAsync来开启异步任务支持。
@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
我们这里选择单独搞个配置类
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig
Step2 搞方法标记 @Async注解
package com.artisan.jobs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2022/3/1 0:42
* @mark: show me the code , change the world
*/
@Component
@Slf4j
public class AsyncJob
@Async
public void job1() throws InterruptedException
long beginTime = System.currentTimeMillis();
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
log.info("job1 cost ms", endTime - beginTime);
@Async
public void job2() throws InterruptedException
long beginTime = System.currentTimeMillis();
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
log.info("job2 cost ms", endTime - beginTime);
Step3 搞调用
package com.artisan.controller;
import com.artisan.jobs.AsyncJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2022/3/1 0:44
* @mark: show me the code , change the world
*/
@RestController
@RequestMapping("/async")
@Slf4j
public class AsyncController
@Autowired
private AsyncJob asyncJob;
@RequestMapping("/job")
public String task() throws InterruptedException
long beginTime = System.currentTimeMillis();
// 执行异步任务
asyncJob.job1();
asyncJob.job2();
// 模拟业务耗时
Thread.sleep(1000);
long cost = System.currentTimeMillis() - beginTime;
log.info("main cost ms", cost);
return "Task Cost " + cost + " ms";
@Async注解在默认情况下用的是SimpleAsyncTaskExecutor
线,不是真正意义上的线程池。
所以,线程名称是 task-1 , task-2, task-3 , task-4…
2022-03-02 22:33:47.007 [http-nio-8080-exec-6] INFO com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:47.675 [http-nio-8080-exec-2] INFO com.artisan.controller.AsyncController:39 - main cost 1001 ms
2022-03-02 22:33:48.021 [task-4] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2014 ms
2022-03-02 22:33:48.021 [task-3] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2014 ms
2022-03-02 22:33:48.396 [http-nio-8080-exec-5] INFO com.artisan.controller.AsyncController:39 - main cost 1015 ms
2022-03-02 22:33:48.678 [task-6] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2004 ms
2022-03-02 22:33:48.678 [task-5] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2004 ms
2022-03-02 22:33:49.004 [http-nio-8080-exec-3] INFO com.artisan.controller.AsyncController:39 - main cost 1008 ms
2022-03-02 22:33:49.393 [task-8] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2011 ms
2022-03-02 22:33:49.393 [task-7] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2011 ms
2022-03-02 22:33:50.012 [task-9] INFO com.artisan.jobs.AsyncJob:26 - job1 cost 2015 ms
2022-03-02 22:33:50.012 [task-10] INFO com.artisan.jobs.AsyncJob:35 - job2 cost 2015 ms
可以看到,每次调用都会new一个线程。若系统中不断的创建线程…
Spring提供的线程池
名称 | 说明 |
---|---|
SimpleAsyncTaskExecutor | 这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地 |
ConcurrentTaskExecutor | Executor的适配类,不推荐使用。如ThreadPoolTaskExecutor 不满足要求时,才用考虑使用这个类 |
ThreadPoolTaskScheduler | 可以使用cron表达式 |
ThreadPoolTaskExecutor | 推荐。 是对java.util.concurrent.ThreadPoolExecutor 的包装 |
V2.0 实现@Async的自定义线程池
package com.artisan.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
* 我们这里选择使用单独的配置类AsyncConfiguration。
* @date 2022/3/1 0:41
* @mark: show me the code , change the world
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig
/**
* 核心线程数(默认线程数)
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 最大线程数
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
private static final int KEEP_ALIVE_TIME = 10;
/**
* 缓冲队列大小
*/
private static final int QUEUE_CAPACITY = 200;
/**
* 线程池名前缀
*/
private static final String THREAD_NAME_PREFIX = "Async-Service-";
/**
* 自定义线程池
*
* @return
*/
@Bean("customAsyncPoolTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(KEEP_ALIVE_TIME);
executor.setKeepAliveSeconds(QUEUE_CAPACITY);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
其他保持不变, 重启测试
V3.0 多个线程池处理
需求: 不同的业务,使用不同的线程池
多个线程池
package com.artisan.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @description: 使用@EnableAsync来开启异步任务支持,@EnableAsync注解可以直接放在SpringBoot启动类上,也可以单独放在其他配置类上。
* 我们这里选择使用单独的配置类ThreadPoolTaskConfig
* @date 2022/3/1 0:41
* @mark: show me the code , change the world
*/
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig
/**
* 核心线程数(默认线程数)
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 最大线程数
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
private static final int KEEP_ALIVE_TIME = 10;
/**
* 缓冲队列大小
*/
private static final int QUEUE_CAPACITY = 200;
/**
* 线程池名前缀
*/
private static final String THREAD_NAME_PREFIX = "Biz1_Async-Service-";
/**
* 线程池名前缀
*/
private static final String THREAD_NAME_PREFIX_2= "Biz2_Async-Service-";
/**
* 自定义线程池
*
* @return
*/
@Bean("tp1")
public ThreadPoolTaskExecutor taskExecutor()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(KEEP_ALIVE_TIME);
executor.setKeepAliveSeconds(QUEUE_CAPACITY);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX);
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
/**
* 自定义线程池
*
* @return
*/
@Bean("tp2")
public ThreadPoolTaskExecutor taskExecutor2()
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(CORE_POOL_SIZE);
executor.setMaxPoolSize(MAX_POOL_SIZE);
executor.setQueueCapacity(KEEP_ALIVE_TIME);
executor.setKeepAliveSeconds(QUEUE_CAPACITY);
executor.setThreadNamePrefix(THREAD_NAME_PREFIX_2);
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
配置 多个线程池, 然后 为@Async指定线程池名字即可实现 多个线程池处理
package com.artisan.jobs;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2022/3/1 0:42
* @mark: show me the code , change the world
*/
@Component
@Slf4j
public class AsyncJob
@Async("tp1")
public void job1() throws InterruptedException
long beginTime = System.currentTimeMillis();
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
log.info("job1 cost ms", endTime - beginTime);
@Async("tp2")
public void job2() throws InterruptedException
long beginTime = System.currentTimeMillis();
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
log.info("job2 cost ms", endTime - beginTime);
@Async()
public void job3() throws InterruptedException
long beginTime = System.currentTimeMillis();
Thread.sleep(2000);
long endTime = System.currentTimeMillis();
log.info("job3 cost ms", endTime - beginTime);
默认线程池
@Async()没标注,用哪个?????? 当系统存在多个线程池时,我们也可以配置一个默认线程池 ,配置类让其实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池
package com.artisan.multi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
* <p>
* 实现AsyncConfigurer,并重写getAsyncExecutor()方法,指定默认线程池
*/
@Configuration
@EnableAsync
@Slf4j
public class DefaultAsyncConfiguration implements AsyncConfigurer
/**
* 核心线程数(默认线程数)
*/
private static final int CORE_POOL_SIZE = 2;
/**
* 最大线程数
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 允许线程空闲时间(单位:默认为秒)
*/
private static final int KEEP_ALIVE_TIME = 10;
/**
* 缓冲队列大小
*/
private static final int QUEUE_CAPACITY = 200;
/**
* 线程池名前缀
*/
private static final String THREAD_NAME_PREFIX = "Default_Async-Service-";
@Bean(name = "defaultPool")
public ThreadPoolTaskExecutor executor()
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
taskExecutor.setQueueCapacity(KEEP_ALIVE_TIME);
taskExecutor.setKeepAliveSeconds(QUEUE_CAPACITY);
taskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
/**
* 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
* 通常有以下四种策略:
* ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
* ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
* ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
* ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize();
return taskExecutor;
/**
* 指定默认线程池
*/
@Override
public Executor getAsyncExecutor()
return executor();
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler()
return (ex, method, params) -> log.error("线程池执行任务发横未知错误,执行方法:", method.getName(), ex);
以上是关于SpringBoot - 优雅的实现异步编程的主要内容,如果未能解决你的问题,请参考以下文章