ThreadPoolTaskExecutor执行任务小总结

Posted yyqxxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolTaskExecutor执行任务小总结相关的知识,希望对你有一定的参考价值。

一、基础篇

 1.线程池的参数

 1> CorePoolSize  核心线程数

 2> MaxPoolSize 最大线程数

 3> QueueCapacity 队列容量

 4> KeepAliveSecond  没有任务存活时间

 5> TimeUnit  时间单位

 6> rejectedExecutionHandler 拒绝策略

 7> threadFactory一般使用默认的即可

2.拒绝策略

 1>ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

 2>ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

 3>ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

 4>ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

二、应用篇

1.使用@Scheduled注解实现简单定时任务,如:没5秒中打印一次come on

1>项目启动类添加@EnableScheduling注解开启

2>定时类上加@Component注解

3>定时方法上加@Scheduled(fixedDelay = 5000) 或者@Scheduled(cron = "0/3 * * * * *")

 

三、线程池的使用


@Slf4j
@Component
public class DataMarketComponent implements CommandLineRunner {




@Override
public void run(String... args) throws Exception {

dynamicTimerComponent.register("proxy_concurrent_req", new DataMarketComponent.ProxyConcurrentReq(), "30 */1 * * * ?");
}

/***
* 根据proxyType proxyid统计每分钟的请求量
*/
public class ProxyConcurrentReq implements Runnable {
@Override
public void run() {

//多线程池
ExecutorService exs = null;
try {

//代理统计对象集合

if (tableList.size() > 0){
List<ProxyConReqLog> proxyConReqLogList = new ArrayList<>();
ProxyConReqLog proxyConReqLog;
List<Future<Map<String,Map<String,Long>>>> futureMap = new ArrayList<>();
String startTimeStr = DateUtil.getStringFormat(startTime, DateUtil.DATETIMEFORMAT2);
String endTimeStr = DateUtil.getStringFormat(endTime, DateUtil.DATETIMEFORMAT2);
//开启多线程
exs = Executors.newFixedThreadPool(tableList.size());
for (String otsTableName : tableList){
if (otsTableName.contains("ods_output_proxy_")){
try {
taskId = otsTableName.replaceAll("ods_output_proxy_","").replaceAll("_","");
String tableIndex = "_index_" + taskId;
if (null != dispatcherService.getOne(new Dispatcher(taskId,null))){
//1.高速提交任务,每个任务返回一个Future入list
futureMap.add(exs.submit(new CallableTaskAgg(startTimeStr, endTimeStr, otsTableName, tableIndex)));
}
} catch (Exception e) {
log.error("提交任务发生错误", e.getMessage());
}
}
}
//所有表聚合到一起的结果
Map<String,Long> httpMap = new HashMap<>();
Map<String,Long> tcpMap = new HashMap<>();
for (Future<Map<String,Map<String,Long>>> future : futureMap) {
while (true) {
if (future.isDone()&& !future.isCancelled()) {
Map<String, Map<String, Long>> tableResult = future.get();
Map<String, Long> singleTcpMap = tableResult.get(ProxyTypeEnum.TCP.code);
Map<String, Long> singleHttpMap = tableResult.get(ProxyTypeEnum.HTTP.code);
if (null != singleTcpMap && !CollectionUtils.isEmpty(singleTcpMap)){
singleTcpMap.forEach((key,value) -> tcpMap.merge(key,value,Long::sum));
}
if (null != singleHttpMap && !CollectionUtils.isEmpty(singleHttpMap)){
singleHttpMap.forEach((key,value) -> httpMap.merge(key,value,Long::sum));
}
break;
} else {
//每次轮询休息1毫秒(CPU纳秒级),避免CPU高速轮循耗空CPU
Thread.sleep(1);
}
}
}





}
} catch (Exception e) {
log.error("run exception, message is : {} ", e.getMessage());
} finally {
exs.shutdown();
}

}
}

private String generatorId(ProxyConReqLog proxyConReqLog) {
String dataVersion = DateUtil.getDateFormat(new Date(),"yyyyMMddHHmmss");
String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
String bn = dataVersion + uuid.substring(0,8);
return bn;

}



class CallableTaskAgg implements Callable{

String otsTableName;
String tableIndex;
String startTime;
String endTime;

public CallableTaskAgg(String startTime, String endTime, String otsTableName, String tableIndex) {
this.startTime = startTime;
this.endTime = endTime;
this.otsTableName = otsTableName;
this.tableIndex = tableIndex;
}
@Override
public Map<String,Map<String,Long>> call() {
Map map = otsQueryService.subGroupByFilter(startTime, endTime, otsTableName, tableIndex, "proxy_type", "proxy_id");
return map;
}
}
}

 

以上是关于ThreadPoolTaskExecutor执行任务小总结的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolTaskExecutor执行任务小总结

spring ThreadPoolTaskExecutor使用总结

spring boot: 线程池ThreadPoolTaskExecutor, 多线程

Spring线程池ThreadPoolTaskExecutor学习总结

Spring线程池ThreadPoolTaskExecutor配置及详情

ThreadPoolTaskExecutor