springboot+@async异步线程池的配置及应用
Posted gendway
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot+@async异步线程池的配置及应用相关的知识,希望对你有一定的参考价值。
示例:
1、 配置
@EnableAsync @Configuration public class TaskExecutorConfiguration { @Autowired private TaskExecutorProperties taskExecutorProperties; @Bean public Executor routeGen() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(taskExecutorProperties.getCorePoolSize()); //50 executor.setQueueCapacity(taskExecutorProperties.getQueueCapacity()); //200 executor.setMaxPoolSize(taskExecutorProperties.getMaxPoolSize()); //500 executor.setKeepAliveSeconds(taskExecutorProperties.getKeepAliveSeconds()); executor.setThreadNamePrefix("gen-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; }
}
2、 运用(作用于方法上)
@Async("routeGen")
public Future<Result<String>> genRouteByCategory(RouteGenDTO routeGenDTO, List<String> cityList, String category){
}
3、 异常处理及日志记录
public Result<String> genRouteByShard(RouteGenDTO routeGenDTO) throws RouteGenException { // 根据分片获取城市 Result<List<String>> cityResult = cityService.queryAllCity(); if (cityResult == null || !cityResult.isSuccess()) { String errorMsg = cityResult == null ? "查询城市没有返回结果" : cityResult.getMsg(); // 记录查询城市日志 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg); return Result.ofFail(ErrorEnum.QUERY_ERROR.getCode(), errorMsg); } List<String> cityList = cityResult.getData(); if (CollectionUtils.isEmpty(cityList)) { String errorMsg = "没有查询到城市,无需生成路线"; // 记录查询城市日志 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg); return Result.ofSuccessMsg(errorMsg); } // 记录查询城市日志 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, JSON.toJSONString(cityList), null); Map<String, Future<Result<String>>> categoryFutureMap = new ConcurrentHashMap<>(); // 禁用的类别列表 List<String> disableCategoryList = null; String disableCategory = genProperties.getDisableCategory(); if (StringUtils.isNotBlank(disableCategory)) { String[] disableCategoryArr = disableCategory.split(","); disableCategoryList = Stream.of(disableCategoryArr).collect(Collectors.toList()); } // 根据类别生成路线 // 每个类别一个线程 for (RouteCategoryEnum routeCategory : RouteCategoryEnum.values()) { // 跳过禁用的类别 if (CollectionUtils.isNotEmpty(disableCategoryList) && disableCategoryList.contains(routeCategory.getType())) { // 记录生成日志 logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, routeCategory.getType(), routeCategory.getType() + "类别被禁用,无需生成"); continue; } // 根据城市和类别生成路线 Future<Result<String>> categoryFuture = routeGenService.genRouteByCategory(routeGenDTO, cityList, routeCategory.getType()); categoryFutureMap.put(routeCategory.getType(), categoryFuture); } String lastErrorMsg = null; for (Map.Entry<String, Future<Result<String>>> entry : categoryFutureMap.entrySet()) { String errorMsg = null; String category = entry.getKey(); Future<Result<String>> futureResult = entry.getValue(); if (futureResult != null) { try { if (futureResult.isCancelled()) { errorMsg = "线程被取消"; log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); continue; } Result<String> result = futureResult.get(CommonConfConstants.FUTURE_CATEGORY_WAIT_TIME, TimeUnit.SECONDS); if (result == null || !result.isSuccess()) { errorMsg = result == null ? category + "没有返回处理结果" : result.getMsg(); log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); } else { log.info("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 生成成功", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), result); } } catch (TimeoutException e) { errorMsg = category + "生成路线 线程处理超时:" + e.getMessage(); log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程处理超时", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } catch (InterruptedException e) { errorMsg = category + "生成路线 线程中断异常:" + e.getMessage(); log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程中断异常", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } catch (ExecutionException e) { errorMsg = category + "生成路线 线程执行异常:" + e.getMessage(); log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 线程执行异常", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } } else { errorMsg = category + "生成路线 线程返回结果为null"; log.error("分片项{} 分片总数{} 生成 {} 路线 任务开始时间:{} 处理失败,错误描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); } // 记录生成日志 logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, category, errorMsg); if (StringUtils.isNotBlank(errorMsg)) { lastErrorMsg = errorMsg; } } if (StringUtils.isNotBlank(lastErrorMsg)) { return Result.ofFail(ErrorEnum.GEN_ROUTE_FAILURE.getCode(), lastErrorMsg); } return Result.ofSuccess("路线生成成功"); }
private void logQueryCityByShard(RouteJobTypeEnum routeJobTypeEnum, RouteGenDTO routeGenDTO, String cityJson, String errorMsg) {
String logFormat;
String logMsg;
if (StringUtils.isBlank(errorMsg)) {
logFormat = "查询城市成功,城市信息:%s";
logMsg = String.format(logFormat, cityJson);
} else {
logMsg = errorMsg;
}
String operationObject = routeJobTypeEnum.getType() + "_job_"
+ routeGenDTO.getShardItem() + "_"
+ TimeUtil.dateTimeToStr(routeGenDTO.getStartTime(), DateFormatConstants.DATE_TIME_COMPACT_FORMAT);
LogInfoDTO logInfoDTO = new LogInfoDTO();
logInfoDTO.setBusinessName(routeJobTypeEnum.getDesc());
logInfoDTO.setOperationName("根据分片查询城市");
logInfoDTO.setOperationObject(operationObject);
logInfoDTO.setOperationDesc(logMsg);
LogUtil.log(logInfoDTO);
}
以上是关于springboot+@async异步线程池的配置及应用的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!