线程池管理工具类

Posted cheng2839

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池管理工具类相关的知识,希望对你有一定的参考价值。

 

各位读者,大家好!

   

   我们在项目开发过程中,经常会使用线程池管理,特别是对TPS有一定要求的情况。

   线程池会自动帮助我们管理线程的创建、回收及销毁等工作,方便我们的开发。特别有异步需求、调用第三方系统且比较耗时的批量任务,都可以使用线程池很好的帮我们节省大量时间,提高用户的体验。

 

   比如:小美想从XX系统进行贷款,那么XX系统肯定需要获取小美的人行征信,还可能需要校验小美是否从此种同类的系统已经贷款,如果贷款了,那么就不能再次贷款。这里牵扯到2个第三方系统,人行征信系统 及 贷款信息汇总系统,假设从人行征信系统获取征信需要5s,取贷款信息汇总系统需要3s,那么按照正常的同步流程:先获取征信,再获取贷款信息,就一共需要8s;那么如果再加上其他各种各样的校验、第三方外调(甚至外调超时),那么加起来时间更长,对客户的体验肯定会受影响,可能就会丢失小美这样大量的客户。

    所以,我们可以使用线程池来做这件事情,让A线程去调人行征信,让B线程去获取贷款信息,让C线程去调其他第三方依赖系统....,这样,一旦任何一个线程得到不符合贷款的校验结果,就返回并通知终止其他线程。这样,客户等待的时间就成倍节约下来,理论上最好情况是等待了花费最长的那个线程的时间。不仅如此,假设所有线程校验都通过,我们需要等待以上所有线程执行完毕,才可以继续接下来的工作,这个时候我们就可以用到本次介绍的工具类中awaitTasksFinished方法即可。

    以下是该线程管理工具的完整代码:

 

  1 package com.cheng2839.utils;
  2 
  3 import org.slf4j.Logger;
  4 import org.slf4j.LoggerFactory;
  5 import org.springframework.util.CollectionUtils;
  6 
  7 import java.util.List;
  8 import java.util.concurrent.BlockingQueue;
  9 import java.util.concurrent.CompletionService;
 10 import java.util.concurrent.ExecutorCompletionService;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.Future;
 13 import java.util.concurrent.LinkedBlockingQueue;
 14 import java.util.concurrent.RejectedExecutionHandler;
 15 import java.util.concurrent.ThreadPoolExecutor;
 16 import java.util.concurrent.TimeUnit;
 17 
 18 /**
 19  * 线程池工具类
 20  *
 21  * @author cheng2839
 22  * @date 2018年11月16日
 23  */
 24 public final class ThreadPoolFactory {
 25 
 26     private static final Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class);
 27 
 28     private static ThreadPoolFactory factory = new ThreadPoolFactory();
 29 
 30     //线程池默认配置设置
 31     private static final int CORE_POOL_SIZE = 5;
 32     private static final int MAXIMUM_POOL_SIZE = 20;
 33     private static final long KEEP_ALIVE_TIME = 60L;
 34 
 35     private ThreadPoolFactory() {
 36     }
 37 
 38     public static ThreadPoolFactory getFactory() {
 39         if (factory == null) {
 40             factory = new ThreadPoolFactory();
 41         }
 42         return factory;
 43     }
 44 
 45     /**
 46      * 创建一个默认的线程池
 47      *
 48      * @return
 49      * @author cheng2839
 50      * @date 2018年11月16日
 51      */
 52     public ExecutorService getDefaultThreadPool() {
 53         //配置最大队列容量
 54         BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
 55         return getCustomThreadPool(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, blockingQueue);
 56     }
 57 
 58     /**
 59      * 创建一个简单的线程池
 60      *
 61      * @return
 62      * @author cheng2839
 63      * @date 2018年11月16日
 64      */
 65     public ExecutorService getSimpleThreadPool(int corePoolSize, int maximumPoolSize) {
 66         return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, new LinkedBlockingQueue<Runnable>());
 67     }
 68 
 69     /**
 70      * 创建一个指定队列的线程池
 71      *
 72      * @return
 73      * @author cheng2839
 74      * @date 2018年11月16日
 75      */
 76     public ExecutorService getCustomQueueThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> blockingQueue) {
 77         return getCustomThreadPool(corePoolSize, maximumPoolSize, KEEP_ALIVE_TIME, blockingQueue);
 78     }
 79 
 80     /**
 81      * 创建可跟踪任务状态的执行器
 82      *
 83      * @return
 84      * @author cheng2839
 85      * @date 2018年11月16日
 86      */
 87     public ExecutorCompletionService getCompletionService(ExecutorService executorService) {
 88         return new ExecutorCompletionService(executorService);
 89     }
 90 
 91     /**
 92      * 创建一个定制化的线程池
 93      *
 94      * @return
 95      * @author cheng2839
 96      * @date 2018年11月16日
 97      */
 98     public ExecutorService getCustomThreadPool(int corePoolSize,
 99                                                int maximumPoolSize,
100                                                Long keepAliveTime,
101                                                BlockingQueue<Runnable> blockingQueue) {
102         logger.info("开始初始化线程池[corePoolSize={},maximumPoolSize={},keepAliveTime={}s]...", corePoolSize, maximumPoolSize, keepAliveTime);
103 
104         RejectedExecutionHandler rejectedExecutionHandler = (Runnable r, ThreadPoolExecutor executor) -> {
105             logger.error("线程池已满,任务被丢弃...........................");
106             return;
107         };
108         ExecutorService executorService = new ThreadPoolExecutor(corePoolSize,
109                 maximumPoolSize,
110                 keepAliveTime,
111                 TimeUnit.SECONDS,
112                 blockingQueue,
113                 rejectedExecutionHandler);
114         logger.info("初始化线程池完成!");
115         return executorService;
116     }
117 
118     /**
119      * 等待任务执行完成 并 释放连接池
120      *
121      * @param futureList
122      * @param completionService
123      * @param <V>
124      * @author cheng2839
125      * @date 2018年11月16日
126      */
127     public <V> void awaitTasksFinished(List<Future> futureList, CompletionService<V> completionService) {
128         try {
129             if (!CollectionUtils.isEmpty(futureList) && completionService != null) {
130                 logger.info("等待批量任务[{}]执行。。。", futureList.size());
131                 for (int n = 0; n < futureList.size(); n++) {
132                     Future future = completionService.take();
133                     if (future != null)
134                         future.get();
135                 }
136             }
137         } catch (Exception e) {
138             logger.error("多线程获取结果异常: {}", e);
139         }
140     }
141 
142     /**
143      * 关闭线程池
144      *
145      * @param executorService
146      * @author cheng2839
147      * @date 2018年11月16日
148      */
149     public void shutdown(ExecutorService executorService) {
150         try {
151             if (executorService != null) {
152                 logger.info("关闭线程池:{}", executorService);
153                 executorService.shutdown();
154             }
155         } catch (Exception e) {
156             if (!executorService.isTerminated()) {
157                 executorService.shutdownNow();
158             }
159         } finally {
160             try {
161                 if (executorService != null && !executorService.isShutdown()) {
162                     executorService.shutdown();
163                 }
164             } catch (Exception e) {
165                 logger.error("线程池关闭异常:{}", e);
166             }
167         }
168     }
169 
170 }

 

    运行示例略,大家可以根据自己的需求灵活运用!

 

    源码归本人所有,如需转载,请标注出处。

 

我还是忍不住放上了这张图片,万一你想和我说说话呢???

技术图片

 

 

以上是关于线程池管理工具类的主要内容,如果未能解决你的问题,请参考以下文章

如何自定义线程池工具类(ThreadPoolUtils)

如何自定义线程池工具类(ThreadPoolUtils)

如何自定义线程池工具类(ThreadPoolUtils)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(关闭线程池: shutdownshutdownNow)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- Executors 类创建线程池(创建ThreadPoolExecutor对象)

线程池管理工具类