Spring的线程池技术:ThreadPoolTaskExecutor
Posted kehuaihan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring的线程池技术:ThreadPoolTaskExecutor相关的知识,希望对你有一定的参考价值。
1 package threadPool; 2 3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.concurrent.ExecutionException; 7 import java.util.concurrent.FutureTask; 8 import java.util.concurrent.TimeUnit; 9 import java.util.concurrent.TimeoutException; 10 11 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 12 13 public class ThreadPoolTest { 14 15 // 通过依赖注入注入进来(service层接口或者外围系统接口) 16 private BusinessService service; // = new BusinessServiceImpl(); 17 // 通过依赖注入注入进来(Spring提供的线程池类) 18 private ThreadPoolTaskExecutor taskThreadPool = new ThreadPoolTaskExecutor(); 19 20 // 业务方法,供别人调用,真实场景会有参数传进来 21 public TaskResp getTasks() { 22 23 // 模拟入参 24 List<FirstTaskReq> firstReqList = new ArrayList<FirstTaskReq>(); 25 List<SecondTaskReq> secondReqList = new ArrayList<SecondTaskReq>(); 26 List<ThirdTaskReq> thirdReqList = new ArrayList<ThirdTaskReq>(); 27 28 // 定义响应 29 TaskResp taskResp = new TaskResp(); 30 List<FirstTaskResp> firstTaskRespList = new ArrayList<FirstTaskResp>(); 31 List<SecondTaskResp> secondTaskRespList = new ArrayList<SecondTaskResp>(); 32 List<ThirdTaskResp> thirdTaskRespList = new ArrayList<ThirdTaskResp>(); 33 34 // 定义接收任务结果的集合 35 ArrayList<FutureTask<Map<String,Object>>> taskList = new ArrayList<FutureTask<Map<String,Object>>>(); 36 37 if(!firstReqList.isEmpty()) { 38 // 通过线程池进行并发操作 39 for(FirstTaskReq req : firstReqList) { 40 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new FirstTask(req, service)); 41 taskThreadPool.execute(task); 42 // 接收FutrueTask的返回结果 43 taskList.add(task); 44 } 45 } 46 47 if(!secondReqList.isEmpty()) { 48 // 通过线程池进行并发操作 49 for(SecondTaskReq req : secondReqList) { 50 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new SecondTask(req, service)); 51 taskThreadPool.execute(task); 52 // 接收FutrueTask的返回结果 53 taskList.add(task); 54 } 55 } 56 57 if(!secondReqList.isEmpty()) { 58 // 通过线程池进行并发操作 59 for(ThirdTaskReq req : thirdReqList) { 60 FutureTask<Map<String, Object>> task = new FutureTask<Map<String, Object>>(new ThirdTask(req, service)); 61 taskThreadPool.execute(task); 62 // 接收FutrueTask的返回结果 63 taskList.add(task); 64 } 65 } 66 67 int outTime = 100; 68 for(FutureTask<Map<String, Object>> task : taskList) { 69 70 try { 71 // 获取每一个任务执行结果(每次获取都要等待100毫秒) 72 Map<String, Object> returnTasks = task.get(outTime, TimeUnit.MILLISECONDS); 73 // 进行业务处理 74 if(returnTasks != null && !returnTasks.isEmpty()) { 75 for(Map.Entry<String, Object> returnTask : returnTasks.entrySet()){ 76 if(returnTask != null && returnTask.getKey()!=null){ 77 String futureKey = returnTask.getKey(); 78 if(futureKey.contains("firstTask")){ 79 FirstTaskResp firstTaskResp = (FirstTaskResp) returnTask.getValue(); 80 firstTaskRespList.add(firstTaskResp); 81 }else if(futureKey.contains("secondTask")){ 82 SecondTaskResp secondTaskResp = (SecondTaskResp) returnTask.getValue(); 83 secondTaskRespList.add(secondTaskResp); 84 }else if(futureKey.contains("thirdTask")){ 85 ThirdTaskResp thirdTaskResp = (ThirdTaskResp) returnTask.getValue(); 86 thirdTaskRespList.add(thirdTaskResp); 87 } 88 } 89 } 90 } 91 92 } catch (InterruptedException e) { 93 e.printStackTrace(); 94 } catch (ExecutionException e) { 95 e.printStackTrace(); 96 } catch (TimeoutException e) { 97 e.printStackTrace(); 98 } 99 } 100 101 taskResp.setFirstTaskRespList(firstTaskRespList); 102 taskResp.setSecondTaskRespList(secondTaskRespList); 103 taskResp.setThirdTaskRespList(thirdTaskRespList); 104 105 return taskResp; 106 } 107 }
1 package threadPool; 2 3 import java.util.List; 4 5 public class TaskResp { 6 7 private List<FirstTaskResp> firstTaskRespList; 8 private List<SecondTaskResp> secondTaskRespList; 9 private List<ThirdTaskResp> thirdTaskRespList; 10 11 public List<FirstTaskResp> getFirstTaskRespList() { 12 return firstTaskRespList; 13 } 14 public void setFirstTaskRespList(List<FirstTaskResp> firstTaskRespList) { 15 this.firstTaskRespList = firstTaskRespList; 16 } 17 public List<SecondTaskResp> getSecondTaskRespList() { 18 return secondTaskRespList; 19 } 20 public void setSecondTaskRespList(List<SecondTaskResp> secondTaskRespList) { 21 this.secondTaskRespList = secondTaskRespList; 22 } 23 public List<ThirdTaskResp> getThirdTaskRespList() { 24 return thirdTaskRespList; 25 } 26 public void setThirdTaskRespList(List<ThirdTaskResp> thirdTaskRespList) { 27 this.thirdTaskRespList = thirdTaskRespList; 28 } 29 30 }
1 package threadPool; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.Callable; 6 7 public class FirstTask implements Callable<Map<String, Object>> { 8 9 // 请求对象 10 private RequestParam param; 11 // service层接口或者外围系统接口 12 private BusinessService service; 13 14 // 构造函数,接收入参 15 public FirstTask(RequestParam param, BusinessService service) { 16 this.param = param; 17 this.service = service; 18 } 19 20 @Override 21 public Map<String, Object> call() throws Exception { 22 23 Map<String, Object> resultMap = new HashMap<String, Object>(); 24 // 进行接口调用 25 BusinessResp businessResp = service.handleBusiness(param); 26 // 组装返回值 27 resultMap.put("firstTask", businessResp.getFirstTaskResp()); 28 29 return resultMap; 30 } 31 32 }
1 package threadPool; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.Callable; 6 7 public class SecondTask implements Callable<Map<String, Object>> { 8 9 // 请求对象 10 private RequestParam param; 11 // service层接口或者外围系统接口 12 private BusinessService service; 13 14 // 构造函数,接收入参 15 public SecondTask(RequestParam param, BusinessService service) { 16 this.param = param; 17 this.service = service; 18 } 19 20 @Override 21 public Map<String, Object> call() throws Exception { 22 23 Map<String, Object> resultMap = new HashMap<String, Object>(); 24 // 进行接口调用 25 BusinessResp businessResp = service.handleBusiness(param); 26 // 组装返回值 27 resultMap.put("secondTask", businessResp.getSecondTaskResp()); 28 29 return resultMap; 30 } 31 32 }
1 package threadPool; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 import java.util.concurrent.Callable; 6 7 public class ThirdTask implements Callable<Map<String, Object>> { 8 9 // 请求对象 10 private RequestParam param; 11 // service层接口或者外围系统接口 12 private BusinessService service; 13 14 // 构造函数,接收入参 15 public ThirdTask(RequestParam param, BusinessService service) { 16 this.param = param; 17 this.service = service; 18 } 19 20 @Override 21 public Map<String, Object> call() throws Exception { 22 23 Map<String, Object> resultMap = new HashMap<String, Object>(); 24 // 进行接口调用 25 BusinessResp businessResp = service.handleBusiness(param); 26 // 组装返回值 27 resultMap.put("thirdTask", businessResp.getThirdTaskResp()); 28 29 return resultMap; 30 } 31 32 }
1 package threadPool; 2 3 public class RequestParam { 4 5 }
1 package threadPool; 2 3 public class FirstTaskReq extends RequestParam { 4 5 }
1 package threadPool; 2 3 public class FirstTaskResp extends TaskResp{ 4 5 }
1 package threadPool; 2 3 public class SecondTaskReq extends RequestParam { 4 5 }
1 package threadPool; 2 3 public class SecondTaskResp extends TaskResp{ 4 5 }
1 package threadPool; 2 3 public class ThirdTaskReq extends RequestParam { 4 5 }
1 package threadPool; 2 3 public class ThirdTaskResp extends TaskResp{ 4 5 }
1 package threadPool; 2 3 public interface BusinessService { 4 5 BusinessResp handleBusiness(RequestParam requestParam); 6 }
1 package threadPool; 2 3 public class BusinessServiceImpl implements BusinessService { 4 5 @Override 6 public BusinessResp handleBusiness(RequestParam requestParam) { 7 return null; 8 } 9 10 }
1 package threadPool; 2 3 public class BusinessResp { 4 5 private FirstTaskResp firstTaskResp; 6 private SecondTaskResp secondTaskResp; 7 private ThirdTaskResp thirdTaskResp; 8 9 public FirstTaskResp getFirstTaskResp() { 10 return firstTaskResp; 11 } 12 public void setFirstTaskResp(FirstTaskResp firstTaskResp) { 13 this.firstTaskResp = firstTaskResp; 14 } 15 public SecondTaskResp getSecondTaskResp() { 16 return secondTaskResp; 17 } 18 public void setSecondTaskResp(SecondTaskResp secondTaskResp) { 19 this.secondTaskResp = secondTaskResp; 20 } 21 public ThirdTaskResp getThirdTaskResp() { 22 return thirdTaskResp; 23 } 24 public void setThirdTaskResp(ThirdTaskResp thirdTaskResp) { 25 this.thirdTaskResp = thirdTaskResp; 26 } 27 28 }
1 <bean id="taskThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > 2 <!-- 核心线程数 --> 3 <property name="corePoolSize" value="8" /> 4 <!-- 最大线程数 --> 5 <property name="maxPoolSize" value="16" /> 6 <!-- 队列最大长度 >=mainExecutor.maxSize --> 7 <property name="queueCapacity" value="200" /> 8 <!-- 线程池维护线程所允许的空闲时间 --> 9 <property name="keepAliveSeconds" value="300" /> 10 <!-- 线程池对拒绝任务(无线程可用)的处理策略 --> 11 <property name="rejectedExecutionHandler"> 12 <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy" /> 13 </property> 14 </bean>
以上是关于Spring的线程池技术:ThreadPoolTaskExecutor的主要内容,如果未能解决你的问题,请参考以下文章
Spring线程池ThreadPoolTaskExecutor学习总结