采用线程池进行异步任务处理

Posted 服务端技术杂谈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了采用线程池进行异步任务处理相关的知识,希望对你有一定的参考价值。

创建线程池

阿里JAVA编码规约,建议采用ThreadPoolExecutor创建线程池。

    private static ExecutorService simpleExecutorService = new ThreadPoolExecutor(            200,            300,            0L,
            TimeUnit.MICROSECONDS,            new LinkedBlockingDeque<Runnable>(10000),            new ThreadPoolExecutor.DiscardPolicy());

在同步操作过程中通过线程池完成异步操作

public void doSomething(final String message) {        simpleExecutorService.execute(new Runnable() {            @Override
            public void run() {                try {
                    Thread.sleep(3000);
                    System.out.println("step 2");
                    System.out.println("message=>" + message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });        System.out.println("step 1");
    }

进行测试

ThreadUtil threadUtil = new ThreadUtil();
threadUtil.doSomething("a thread pool demo");

输出结果

step 1
step 2message=>a thread pool demo

@Async

在Spring3.x之后框架已经支持采用@Async注解进行异步执行了。

被@Async修饰的方法叫做异步方法,这些异步方法会在新的线程中进行处理,不影响主线程的顺序执行。

无返回值执行

@Component@Slf4jpublic class AsyncTask {    @Async
    public void dealNoReturnTask(){        log.info("Thread {} deal No Return Task start", Thread.currentThread().getName());        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("Thread {} deal No Return Task end at {}", Thread.currentThread().getName(), System.currentTimeMillis());
    }
}

进行调用:

@SpringBootTest(classes = SpringbootApplication.class)@RunWith(SpringJUnit4ClassRunner.class)@Slf4jpublic class AsyncTest {    @Autowired
    private AsyncTask asyncTask;    @Test
    public void testDealNoReturnTask(){
        asyncTask.dealNoReturnTask();        try {
            log.info("begin to deal other Task!");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

有返回值执行

    @Async
    public Future<String> dealHaveReturnTask() {        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("thread", Thread.currentThread().getName());
        jsonObject.put("time", System.currentTimeMillis());        return new AsyncResult<String>(jsonObject.toJSONString());
    }

判断任务是否取消:

    @Test    public void testDealHaveReturnTask() throws Exception {

        Future<String> future = asyncTask.dealHaveReturnTask();        log.info("begin to deal other Task!");        while (true) {            if(future.isCancelled()){                log.info("deal async task is Cancelled");                break;
            }            if (future.isDone() ) {                log.info("deal async task is Done");                log.info("return result is " + future.get());                break;
            }            log.info("wait async task to end ...");
            Thread.sleep(1000);
        }
    }

异步执行结果异常处理

我们可以实现AsyncConfigurer接口,也可以继承AsyncConfigurerSupport类来实现 在方法getAsyncExecutor()中创建线程池的时候,必须使用 executor.initialize(), 不然在调用时会报线程池未初始化的异常。 如果使用threadPoolTaskExecutor()来定义bean,则不需要初始化

@Configuration@EnableAsync@Slf4jpublic class AsyncConfig implements AsyncConfigurer {//    @Bean//    public ThreadPoolTaskExecutor threadPoolTaskExecutor(){//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//        executor.setCorePoolSize(10);//        executor.setMaxPoolSize(100);//        executor.setQueueCapacity(100);//        return executor;//    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsyncExecutorThread-");
        executor.initialize(); //如果不初始化,导致找到不到执行器
        return executor;
    }    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }
}

异步异常处理类:

@Slf4jpublic class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        log.info("Async method: {} has uncaught exception,params:{}", method.getName(), JSON.toJSONString(params));        if (ex instanceof AsyncException) {
            AsyncException asyncException = (AsyncException) ex;
            log.info("asyncException:{}",asyncException.getErrorMessage());
        }

        log.info("Exception :");
        ex.printStackTrace();
    }
}@Data@AllArgsConstructorpublic class AsyncException extends Exception {    private int code;    private String errorMessage;
}
  • 在无返回值的异步调用中,异步处理抛出异常,AsyncExceptionHandler的handleUncaughtException()会捕获指定异常,原有任务还会继续运行,直到结束。

  • 在有返回值的异步调用中,异步处理抛出异常,会直接抛出异常,异步任务结束,原有处理结束执行。

Future或FutureTask

需要结合Callable

public class CallableDemo implements Callable<Integer> {    private int sum;    @Override
    public Integer call() throws Exception {
        System.out.println("Callable子线程开始计算啦!");
        Thread.sleep(2000);        for(int i=0 ;i<5000;i++){
            sum=sum+i;
        }
        System.out.println("Callable子线程计算结束!");        return sum;
    }
}

Future模式

        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //提交任务并获取执行结果
        Future<Integer> future = es.submit(calTask);        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");

FutureTask模式

        //创建线程池
        ExecutorService es = Executors.newSingleThreadExecutor();        //创建Callable对象任务
        CallableDemo calTask = new CallableDemo();        //创建FutureTask
        FutureTask<Integer> future = new FutureTask<>(calTask);        // future.run();  // 由于FutureTask继承于Runable,所以也可以直接调用run方法执行
        //执行任务
        es.submit(future); // 效果同上面直接调用run方法

        //关闭线程池
        es.shutdown();        try {
            System.out.println("主线程在执行其他任务");            if (future.get() != null) {                //输出获取到的结果
                System.out.println("future.get()-->" + future.get());
            } else {                //输出获取到的结果
                System.out.println("future.get()未获取到结果");
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("主线程在执行完成");

归并异步执行结果

public class FutureDemo{ 
     public static void main(String[] args)  {
         Long start = System.currentTimeMillis();         //开启多线程
         ExecutorService exs = Executors.newFixedThreadPool(10);         try {             //结果集
             List<Integer> list = new ArrayList<Integer>();
             List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();             //1.高速提交10个任务,每个任务返回一个Future入list
             for(int i=0;i<10;i++){
                 futureList.add(exs.submit(new CallableTask(i+1)));
             }

             Long getResultStart = System.currentTimeMillis();
             System.out.println("结果归集开始时间="+new Date());             //2.结果归集,遍历futureList,高速轮询(模拟实现了并发)获取future状态成功完成后获取结果,退出当前循环
             for (Future<Integer> future : futureList) {                  //CPU高速轮询:每个future都并发轮循,判断完成状态然后获取结果,这一行,是本实现方案的精髓所在。即有10个future在高速轮询,完成一个future的获取结果,就关闭一个轮询
                 while (true) {                    //获取future成功完成状态,如果想要限制每个任务的超时时间,取消本行的状态判断+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使用即可。 
                    if (future.isDone()&& !future.isCancelled()) {
                         Integer i = future.get();//获取结果
                         System.out.println("任务i="+i+"获取完成!"+new Date());
                         list.add(i);                         break;//当前future获取结果完毕,跳出while
                     } else {                          //每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU---》新手别忘记这个
                         Thread.sleep(1);
                     }
                 }
             }

             System.out.println("list="+list);

             System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart));
            
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             exs.shutdown();
         }
     }
     static class CallableTask implements Callable<Integer>{
         Integer i;         
         public CallableTask(Integer i) {             super();             this.i=i;
         } 
         @Override
         public Integer call() throws Exception {             if(i==1){
                 Thread.sleep(3000);//任务1耗时3秒
             }else if(i==5){
                 Thread.sleep(5000);//任务5耗时5秒
             }else{
                 Thread.sleep(1000);//其它任务耗时1秒
             }
             System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!");  
             return i;
         }
     }
 }


以上是关于采用线程池进行异步任务处理的主要内容,如果未能解决你的问题,请参考以下文章

异步回调,事件,线程池与协程

Spring Boot中如何配置线程池拒绝策略,妥善处理好溢出的任务

Spring Boot中如何配置线程池拒绝策略,妥善处理好溢出的任务

SpringBoot自定义异步任务线程池

C++11实现半同步半异步的线程池

Java异步编排结合线程池实现多任务按需执行案例