采用线程池进行异步任务处理
Posted 服务端技术杂谈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了采用线程池进行异步任务处理相关的知识,希望对你有一定的参考价值。
创建线程池
阿里JAVA编码规约,建议采用ThreadPoolExecutor创建线程池。
private static ExecutorService simpleExecutorService = new ThreadPoolExecutor( 200, 300, 0L,
TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(10000), new ThreadPoolExecutor.DiscardPolicy());
在同步操作过程中通过线程池完成异步操作
public void doSomething(final String message) { simpleExecutorService.execute(new Runnable() { @Override
public void run() { try {
Thread.sleep(3000);
System.out.println("step 2");
System.out.println("message=>" + message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}); System.out.println("step 1");
}
进行测试
ThreadUtil threadUtil = new ThreadUtil();
threadUtil.doSomething("a thread pool demo");
输出结果
step 1
step 2message=>a thread pool demo
@Async
在Spring3.x之后框架已经支持采用@Async注解进行异步执行了。
被@Async修饰的方法叫做异步方法,这些异步方法会在新的线程中进行处理,不影响主线程的顺序执行。
无返回值执行
@Component@Slf4jpublic class AsyncTask { @Async
public void dealNoReturnTask(){ log.info("Thread {} deal No Return Task start", Thread.currentThread().getName()); try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
}
}
进行调用:
@SpringBootTest(classes = SpringbootApplication.class)@RunWith(SpringJUnit4ClassRunner.class)@Slf4jpublic class AsyncTest { @Autowired
private AsyncTask asyncTask; @Test
public void testDealNoReturnTask(){
asyncTask.dealNoReturnTask(); try {
log.info("begin to deal other Task!");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
有返回值执行
@Async
public Future<String> dealHaveReturnTask() { try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("thread", Thread.currentThread().getName());
jsonObject.put("time", System.currentTimeMillis()); return new AsyncResult<String>(jsonObject.toJSONString());
}
判断任务是否取消:
@Test public void testDealHaveReturnTask() throws Exception {
Future<String> future = asyncTask.dealHaveReturnTask(); log.info("begin to deal other Task!"); while (true) { if(future.isCancelled()){ log.info("deal async task is Cancelled"); break;
} if (future.isDone() ) { log.info("deal async task is Done"); log.info("return result is " + future.get()); break;
} log.info("wait async task to end ...");
Thread.sleep(1000);
}
}
异步执行结果异常处理
我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现 在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(), 不然在调用时会报线程池未初始化的异常。 如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化
@Configuration@EnableAsync@Slf4jpublic class AsyncConfig implements AsyncConfigurer {// @Bean// public ThreadPoolTaskExecutor threadPoolTaskExecutor(){// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// executor.setCorePoolSize(10);// executor.setMaxPoolSize(100);// executor.setQueueCapacity(100);// return executor;// }
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsyncExecutorThread-");
executor.initialize(); //如果不初始化,导致找到不到执行器
return executor;
} @Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
}
异步异常处理类:
@Slf4jpublic class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params)); if (ex instanceof AsyncException) {
AsyncException asyncException = (AsyncException) ex;
log.info("asyncException:{}",asyncException.getErrorMessage());
}
log.info("Exception :");
ex.printStackTrace();
}
}@Data@AllArgsConstructorpublic class AsyncException extends Exception { private int code; private String errorMessage;
}
在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。
在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。
Future或FutureTask
需要结合Callable
public class CallableDemo implements Callable<Integer> { private int sum; @Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算啦!");
Thread.sleep(2000); for(int i=0 ;i<5000;i++){
sum=sum+i;
}
System.out.println("Callable子线程计算结束!"); return sum;
}
}
Future模式
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor(); //创建Callable对象任务
CallableDemo calTask = new CallableDemo(); //提交任务并获取执行结果
Future<Integer> future = es.submit(calTask); //关闭线程池
es.shutdown(); try {
System.out.println("主线程在执行其他任务"); if (future.get() != null) { //输出获取到的结果
System.out.println("future.get()-->" + future.get());
} else { //输出获取到的结果
System.out.println("future.get()未获取到结果");
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主线程在执行完成");
FutureTask模式
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor(); //创建Callable对象任务
CallableDemo calTask = new CallableDemo(); //创建FutureTask
FutureTask<Integer> future = new FutureTask<>(calTask); // future.run(); // 由于FutureTask继承于Runable,所以也可以直接调用run方法执行
//执行任务
es.submit(future); // 效果同上面直接调用run方法
//关闭线程池
es.shutdown(); try {
System.out.println("主线程在执行其他任务"); if (future.get() != null) { //输出获取到的结果
System.out.println("future.get()-->" + future.get());
} else { //输出获取到的结果
System.out.println("future.get()未获取到结果");
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主线程在执行完成");
归并异步执行结果
public class FutureDemo{
public static void main(String[] args) {
Long start = System.currentTimeMillis(); //开启多线程
ExecutorService exs = Executors.newFixedThreadPool(10); try { //结果集
List<Integer> list = new ArrayList<Integer>();
List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //1.高速提交10个任务,每个任务返回一个Future入list
for(int i=0;i<10;i++){
futureList.add(exs.submit(new CallableTask(i+1)));
}
Long getResultStart = System.currentTimeMillis();
System.out.println("结果归集开始时间="+new Date()); //2.结果归集,遍历futureList,高速轮询(模拟实现了并发)获取future状态成功完成后获取结果,退出当前循环
for (Future<Integer> future : futureList) { //CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
while (true) { //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。
if (future.isDone()&& !future.isCancelled()) {
Integer i = future.get();//获取结果
System.out.println("任务i="+i+"获取完成!"+new Date());
list.add(i); break;//当前future获取结果完毕,跳出while
} else { //每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
Thread.sleep(1);
}
}
}
System.out.println("list="+list);
System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
static class CallableTask implements Callable<Integer>{
Integer i;
public CallableTask(Integer i) { super(); this.i=i;
}
@Override
public Integer call() throws Exception { if(i==1){
Thread.sleep(3000);//任务1耗时3秒
}else if(i==5){
Thread.sleep(5000);//任务5耗时5秒
}else{
Thread.sleep(1000);//其它任务耗时1秒
}
System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!");
return i;
}
}
}
以上是关于采用线程池进行异步任务处理的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot中如何配置线程池拒绝策略,妥善处理好溢出的任务