Spring的线程池技术:ThreadPoolTaskExecutor

Posted kehuaihan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring的线程池技术:ThreadPoolTaskExecutor相关的知识,希望对你有一定的参考价值。

技术分享图片
  1 package threadPool;
  2 
  3 import java.util.ArrayList;
  4 import java.util.List;
  5 import java.util.Map;
  6 import java.util.concurrent.ExecutionException;
  7 import java.util.concurrent.FutureTask;
  8 import java.util.concurrent.TimeUnit;
  9 import java.util.concurrent.TimeoutException;
 10 
 11 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 12 
 13 public class ThreadPoolTest {
 14     
 15     // 通过依赖注入注入进来(service层接口或者外围系统接口)
 16     private BusinessService service;  // = new BusinessServiceImpl();
 17     // 通过依赖注入注入进来(Spring提供的线程池类)
 18     private ThreadPoolTaskExecutor taskThreadPool = new ThreadPoolTaskExecutor();
 19 
 20     // 业务方法,供别人调用,真实场景会有参数传进来
 21     public TaskResp getTasks() {
 22         
 23         // 模拟入参
 24         List<FirstTaskReq> firstReqList = new ArrayList<FirstTaskReq>();
 25         List<SecondTaskReq> secondReqList = new ArrayList<SecondTaskReq>();
 26         List<ThirdTaskReq> thirdReqList = new ArrayList<ThirdTaskReq>();
 27         
 28         // 定义响应
 29         TaskResp taskResp = new TaskResp();
 30         List<FirstTaskResp> firstTaskRespList = new ArrayList<FirstTaskResp>();
 31         List<SecondTaskResp> secondTaskRespList = new ArrayList<SecondTaskResp>();
 32         List<ThirdTaskResp> thirdTaskRespList = new ArrayList<ThirdTaskResp>();
 33         
 34         // 定义接收任务结果的集合
 35         ArrayList<FutureTask<Map<String,Object>>> taskList = new ArrayList<FutureTask<Map<String,Object>>>();
 36         
 37         if(!firstReqList.isEmpty()) {
 38             // 通过线程池进行并发操作
 39             for(FirstTaskReq req : firstReqList) {
 40                 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new FirstTask(req, service));
 41                 taskThreadPool.execute(task);
 42                 // 接收FutrueTask的返回结果
 43                 taskList.add(task);
 44             }
 45         }
 46         
 47         if(!secondReqList.isEmpty()) {
 48             // 通过线程池进行并发操作
 49             for(SecondTaskReq req : secondReqList) {
 50                 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new SecondTask(req, service));
 51                 taskThreadPool.execute(task);
 52                 // 接收FutrueTask的返回结果
 53                 taskList.add(task);
 54             }
 55         }
 56 
 57         if(!secondReqList.isEmpty()) {
 58             // 通过线程池进行并发操作
 59             for(ThirdTaskReq req : thirdReqList) {
 60                 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new ThirdTask(req, service));
 61                 taskThreadPool.execute(task);
 62                 // 接收FutrueTask的返回结果
 63                 taskList.add(task);
 64             }
 65         }
 66         
 67         int outTime = 100;
 68         for(FutureTask<Map<String, Object>> task : taskList) {
 69             
 70             try {
 71                 // 获取每一个任务执行结果(每次获取都要等待100毫秒)
 72                 Map<String, Object> returnTasks = task.get(outTime, TimeUnit.MILLISECONDS);
 73                 // 进行业务处理
 74                 if(returnTasks != null && !returnTasks.isEmpty()) {
 75                     for(Map.Entry<String, Object> returnTask : returnTasks.entrySet()){
 76                         if(returnTask != null && returnTask.getKey()!=null){
 77                             String futureKey = returnTask.getKey();
 78                             if(futureKey.contains("firstTask")){
 79                                 FirstTaskResp firstTaskResp = (FirstTaskResp) returnTask.getValue();
 80                                 firstTaskRespList.add(firstTaskResp);
 81                             }else if(futureKey.contains("secondTask")){
 82                                 SecondTaskResp secondTaskResp = (SecondTaskResp) returnTask.getValue();
 83                                 secondTaskRespList.add(secondTaskResp);
 84                             }else if(futureKey.contains("thirdTask")){
 85                                 ThirdTaskResp thirdTaskResp = (ThirdTaskResp) returnTask.getValue();
 86                                 thirdTaskRespList.add(thirdTaskResp);
 87                             }
 88                         }
 89                     }
 90                 }
 91                 
 92             } catch (InterruptedException e) {
 93                 e.printStackTrace();
 94             } catch (ExecutionException e) {
 95                 e.printStackTrace();
 96             } catch (TimeoutException e) {
 97                 e.printStackTrace();
 98             }
 99         }
100         
101         taskResp.setFirstTaskRespList(firstTaskRespList);
102         taskResp.setSecondTaskRespList(secondTaskRespList);
103         taskResp.setThirdTaskRespList(thirdTaskRespList);
104         
105         return taskResp;
106     }
107 }
ThreadPoolTest
技术分享图片
 1 package threadPool;
 2 
 3 import java.util.List;
 4 
 5 public class TaskResp {
 6     
 7     private List<FirstTaskResp> firstTaskRespList;
 8     private List<SecondTaskResp> secondTaskRespList;
 9     private List<ThirdTaskResp> thirdTaskRespList;
10     
11     public List<FirstTaskResp> getFirstTaskRespList() {
12         return firstTaskRespList;
13     }
14     public void setFirstTaskRespList(List<FirstTaskResp> firstTaskRespList) {
15         this.firstTaskRespList = firstTaskRespList;
16     }
17     public List<SecondTaskResp> getSecondTaskRespList() {
18         return secondTaskRespList;
19     }
20     public void setSecondTaskRespList(List<SecondTaskResp> secondTaskRespList) {
21         this.secondTaskRespList = secondTaskRespList;
22     }
23     public List<ThirdTaskResp> getThirdTaskRespList() {
24         return thirdTaskRespList;
25     }
26     public void setThirdTaskRespList(List<ThirdTaskResp> thirdTaskRespList) {
27         this.thirdTaskRespList = thirdTaskRespList;
28     }
29 
30 }
TaskResp

 

技术分享图片
 1 package threadPool;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.Callable;
 6 
 7 public class FirstTask implements Callable<Map<String, Object>> {
 8     
 9     // 请求对象
10     private RequestParam param;
11     // service层接口或者外围系统接口
12     private BusinessService service;
13     
14     // 构造函数,接收入参
15     public FirstTask(RequestParam param, BusinessService service) {
16         this.param = param;
17         this.service = service;
18     }
19 
20     @Override
21     public Map<String, Object> call() throws Exception {
22         
23         Map<String, Object> resultMap = new HashMap<String, Object>();
24         // 进行接口调用
25         BusinessResp businessResp = service.handleBusiness(param);
26         // 组装返回值
27         resultMap.put("firstTask", businessResp.getFirstTaskResp());
28         
29         return resultMap;
30     }
31 
32 }
FirstTask
技术分享图片
 1 package threadPool;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.Callable;
 6 
 7 public class SecondTask implements Callable<Map<String, Object>> {
 8     
 9     // 请求对象
10     private RequestParam param;
11     // service层接口或者外围系统接口
12     private BusinessService service;
13     
14     // 构造函数,接收入参
15     public SecondTask(RequestParam param, BusinessService service) {
16         this.param = param;
17         this.service = service;
18     }
19 
20     @Override
21     public Map<String, Object> call() throws Exception {
22         
23         Map<String, Object> resultMap = new HashMap<String, Object>();
24         // 进行接口调用
25         BusinessResp businessResp = service.handleBusiness(param);
26         // 组装返回值
27         resultMap.put("secondTask", businessResp.getSecondTaskResp());
28         
29         return resultMap;
30     }
31 
32 }
SecondTask
技术分享图片
 1 package threadPool;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 import java.util.concurrent.Callable;
 6 
 7 public class ThirdTask implements Callable<Map<String, Object>> {
 8     
 9     // 请求对象
10     private RequestParam param;
11     // service层接口或者外围系统接口
12     private BusinessService service;
13     
14     // 构造函数,接收入参
15     public ThirdTask(RequestParam param, BusinessService service) {
16         this.param = param;
17         this.service = service;
18     }
19 
20     @Override
21     public Map<String, Object> call() throws Exception {
22         
23         Map<String, Object> resultMap = new HashMap<String, Object>();
24         // 进行接口调用
25         BusinessResp businessResp = service.handleBusiness(param);
26         // 组装返回值
27         resultMap.put("thirdTask", businessResp.getThirdTaskResp());
28         
29         return resultMap;
30     }
31 
32 }
ThirdTask

 

技术分享图片
1 package threadPool;
2 
3 public class RequestParam {
4 
5 }
RequestParam
技术分享图片
1 package threadPool;
2 
3 public class FirstTaskReq extends RequestParam {
4 
5 }
FirstTaskReq
技术分享图片
1 package threadPool;
2 
3 public class FirstTaskResp extends TaskResp{
4 
5 }
FirstTaskResp
技术分享图片
1 package threadPool;
2 
3 public class SecondTaskReq extends RequestParam {
4 
5 }
SecondTaskReq
技术分享图片
1 package threadPool;
2 
3 public class SecondTaskResp extends TaskResp{
4 
5 }
SecondTaskResp
技术分享图片
1 package threadPool;
2 
3 public class ThirdTaskReq extends RequestParam {
4 
5 }
ThirdTaskReq
技术分享图片
1 package threadPool;
2 
3 public class ThirdTaskResp extends TaskResp{
4 
5 }
ThirdTaskResp

 

技术分享图片
1 package threadPool;
2 
3 public interface BusinessService {
4 
5     BusinessResp handleBusiness(RequestParam requestParam);
6 }
BusinessService
技术分享图片
 1 package threadPool;
 2 
 3 public class BusinessServiceImpl implements BusinessService {
 4 
 5     @Override
 6     public BusinessResp handleBusiness(RequestParam requestParam) {
 7         return null;
 8     }
 9 
10 }
BusinessServiceImpl
技术分享图片
 1 package threadPool;
 2 
 3 public class BusinessResp {
 4 
 5     private FirstTaskResp firstTaskResp;
 6     private SecondTaskResp secondTaskResp;
 7     private ThirdTaskResp thirdTaskResp;
 8     
 9     public FirstTaskResp getFirstTaskResp() {
10         return firstTaskResp;
11     }
12     public void setFirstTaskResp(FirstTaskResp firstTaskResp) {
13         this.firstTaskResp = firstTaskResp;
14     }
15     public SecondTaskResp getSecondTaskResp() {
16         return secondTaskResp;
17     }
18     public void setSecondTaskResp(SecondTaskResp secondTaskResp) {
19         this.secondTaskResp = secondTaskResp;
20     }
21     public ThirdTaskResp getThirdTaskResp() {
22         return thirdTaskResp;
23     }
24     public void setThirdTaskResp(ThirdTaskResp thirdTaskResp) {
25         this.thirdTaskResp = thirdTaskResp;
26     }
27     
28 }
BusinessResp

 

技术分享图片
 1  <bean id="taskThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >
 2         <!-- 核心线程数  -->
 3         <property name="corePoolSize" value="8" />
 4         <!-- 最大线程数 -->
 5         <property name="maxPoolSize" value="16" />
 6         <!-- 队列最大长度 >=mainExecutor.maxSize -->
 7         <property name="queueCapacity" value="200" />
 8         <!-- 线程池维护线程所允许的空闲时间 -->
 9         <property name="keepAliveSeconds" value="300" />
10         <!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
11         <property name="rejectedExecutionHandler">
12             <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" />
13         </property>
14     </bean>
配置文件内容

以上是关于Spring的线程池技术:ThreadPoolTaskExecutor的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot中异步线程池@Async详解

如何使用Spring开发和监控线程池服务

Spring线程池配置

Spring线程池ThreadPoolTaskExecutor学习总结

Spring Cloud Gateway中netty线程池优化

分享知识-快乐自己:Spring线程池配置