线程池工具类几种实现
Posted stubborn-dude
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池工具类几种实现相关的知识,希望对你有一定的参考价值。
一 线程池工具类
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
-
@Description 线程池工具类
*/
public class ThreadPoolUtil {/**
- 核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
/
private static final int SIZE_CORE_POOL = 5;
/* - 线程池维护线程的最大数量
/
private static final int SIZE_MAX_POOL = 10;
/* - 线程池维护线程所允许的空闲时间
/
private static final long ALIVE_TIME = 2000;
/* - 线程缓冲队列
*/
private static BlockingQueuebqueue = new ArrayBlockingQueue (100);
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, ALIVE_TIME, TimeUnit.MILLISECONDS, bqueue, new ThreadPoolExecutor.CallerRunsPolicy());
static {
pool.prestartAllCoreThreads();
}public static ThreadPoolExecutor getPool() {
return pool;
}
} - 核心线程数,会一直存活,即使没有任务,线程池也会维护线程的最少数量
测试类
import com.dashuai.cloud.consulconsumer.util.ThreadPoolUtil;
public class TestUtil {
public static void main(String[] args) {
ThreadPoolUtil.getPool().execute(new Runnable() {
@Override
public void run() {
System.out.println("线程池调用");
}
});
}
}
二 线程池支持多线程返回结果
import org.springframework.stereotype.Service;
/**
- ClassName:CommenThreadPoolUtil
- Function:线程池公共入口处理类.
*/
@Service
public class CommonThreadPoolUtil {
// 核心线程数(默认初始化为10)
private int cacheCorePoolSize = 8;
// 核心线程控制的最大数目
private int maxCorePoolSize = 160;
// 队列等待线程数阈值
private int blockingQueueWaitSize = 16;
// 核心线程数自动调整的增量幅度
private int incrementCorePoolSize = 4;
// 初始化线程对象ThreadLocal,重写initialValue(),保证ThreadLocal首次执行get方法时不会null异常
private ThreadLocal<List<Future<?>>> threadlocal = new ThreadLocal<List<Future<?>>>() {
protected List<Future<?>> initialValue() {
return new ArrayList<Future<?>>();
}
};
// 初始化线程池
private MyselfThreadPoolExecutor ThreadPool = new MyselfThreadPoolExecutor(cacheCorePoolSize, cacheCorePoolSize, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
/**
*
* dealTask:(线程池执行操作-包含每个进程返回结果). <br/>
* 1、运用场景:例如,需要同时校验很多不同的逻辑,依赖于获取校验结果响应给用户; 2、具体实现java类:implements
* 的Callable接口,重写call方法即可,支持返回值
*
* @author
* @param callable
* @return
*/
public Map<String, Object> dealTask(Callable<?> callable) {
try {
// 动态更改核心线程数大小
dynamicTuningPoolSize();
// 执行线程业务逻辑及获取返回结果
Future<?> result = ThreadPool.submit(callable);
// 获取当前进程的局部变量
List<Future<?>> threadLocalResult = threadlocal.get();
// 叠加主进程对应的多个进程处理结果
threadLocalResult.add(result);
// 设置最新的threadLocal变量到当前主进程
threadlocal.set(threadLocalResult);
} catch (Exception e) {
e.printStackTrace();
return errorResp("线程池发生异常-Future", null);
}
return successResp(null);
}
/**
*
* dealTask:(线程池执行操作-不包含每个进程返回结果). <br/>
* 1、运用场景:例如,不依赖于响应给用户执行结果的业务逻辑 ; 2、具体实现java类:implements
* 的Runnable接口,重写run方法,没有返回值
*
* @author
* @param runnable
* @return
*/
public Map<String, Object> dealTask(Runnable runnable) {
try {
// 动态更改核心线程数大小
dynamicTuningPoolSize();
// 执行线程业务逻辑
ThreadPool.execute(runnable);
} catch (Exception e) {
e.printStackTrace();
return errorResp("线程池发生异常", null);
}
return successResp(null);
}
/**
* obtainTaskFuture:(获取线程池执行结果:此为阻塞线程,即所有线程都执行完成才能获取结果,故应将执行时间稍长的业务逻辑先执行,
* 减少等待时间). <br/>
* 此方法只能调用一次,即调用之后清除ThreadLocal变量,以便于同一进程再次调用线程池获取最新的执行结果以及释放内存, 防止内存泄露
*
* @author
* @return
*/
public Map<String, Object> obtainTaskFuture() {
List<Future<?>> threadLocalResult = null;
try {
// 获取当前进程变量
threadLocalResult = threadlocal.get();
if (threadLocalResult == null || threadLocalResult.size() == 0) {
return errorResp("获取线程池执行结果为空", null);
} else {
return successResp(threadLocalResult);
}
} catch (Exception e) {
return errorResp("获取线程池执行结果发生异常:" + e.getMessage(), null);
} finally {
// 1、释放内存;2、防止主进程再次调用线程池方法时对结果互有影响。
threadlocal.remove();
}
}
/**
*
* dynamicTuningPoolSize:(动态改变核心线程数). <br/>
*
* @author
* @return
*/
private void dynamicTuningPoolSize() {
// 队列等待任务数(此为近似值,故采用>=判断)
int queueSize = ThreadPool.getQueueSize();
// 动态更改核心线程数大小
if (queueSize >= blockingQueueWaitSize) {
// 核心线程数小于设定的最大线程数才会自动扩展线程数
if (cacheCorePoolSize <= maxCorePoolSize) {
// 原有核心线程数
int corePoolSize = ThreadPool.getCorePoolSize();
// 将要累积的核心线程数
int currentcorePoolSize = corePoolSize + incrementCorePoolSize;
ThreadPool.setCorePoolSize(currentcorePoolSize);
ThreadPool.setMaximumPoolSize(currentcorePoolSize);
cacheCorePoolSize = currentcorePoolSize;
System.out.println("动态改变线程池大小====原核心线程池数目为:" + corePoolSize + ";现累加为:" + currentcorePoolSize);
} else {
System.out.println("动态改变线程池大小====核心线程池数目已累加为:" + cacheCorePoolSize + ";不会继续无限增加");
}
}
}
/**
* 获取核心线程数 getCacheCorePoolSize:(). <br/>
*
* @author
* @return
*/
public int getCacheCorePoolSize() {
return ThreadPool.getCorePoolSize();
}
/**
* 设置核心线程数 setCacheCorePoolSize:(). <br/>
*
* @author
* @param cacheCorePoolSize
*/
public void setCacheCorePoolSize(int cacheCorePoolSize) {
ThreadPool.setCorePoolSize(cacheCorePoolSize);
ThreadPool.setMaximumPoolSize(cacheCorePoolSize);
this.cacheCorePoolSize = cacheCorePoolSize;
}
/**
*
* successResp:(正确响应信息). <br/>
*
* @author
* @param data
* @return
*/
private Map<String, Object> successResp(Object data) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", "0");
result.put("data", data);
return result;
}
/**
*
* errorResp:(错误响应信息). <br/>
*
* @author
* @param errorMsg
* @param data
* @return
*/
public Map<String, Object> errorResp(String errorMsg, Object data) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("status", "1");
result.put("msg", errorMsg);
result.put("data", data);
return result;
}
}
创建线程池类
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyselfThreadPoolExecutor extends ThreadPoolExecutor {
// 初始化父类构造函数及startTime
public MyselfThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
// 按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务(已执行的任务不会停止)
@Override
public void shutdown() {
super.shutdown();
}
// 尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。
@Override
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
// 在执行给定线程中的给定 Runnable 之前调用的方法.可用于重新初始化ThreadLocals或者执行日志记录。
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
// 基于完成执行给定 Runnable 所调用的方法
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
try {
// Future<?> result = (Future<?>) r;
// "任务结果:" result.get();
} catch (Exception e) {
}
}
/**
*
* getQueueSize:(已执行的任务数). <br/>
*
* @author
* @return
*/
@Override
public long getCompletedTaskCount() {
return super.getCompletedTaskCount();
}
/**
*
* getQueueSize:(正在运行的任务数). <br/>
*
* @author
* @return
*/
@Override
public int getActiveCount() {
return super.getActiveCount();
}
/**
*
* getQueueSize:(队列等待任务数). <br/>
*
* @author
* @return
*/
public int getQueueSize() {
return getQueue().size();
}
}
测试类
public class TestUtil {
public static void main(String[] args) {
CommonThreadPoolUtil poolUtil = new CommonThreadPoolUtil();
poolUtil.dealTask(new Runnable() {
@Override
public void run() {
System.out.println("线程池调用");
}
});
}
}
三 jdk1.5之后提供工具类 Executors
工具类Executors面提供了一些静态工厂方法,生成一些常用的线程池,如下所示:
-
newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制(Interger. MAX_VALUE),线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
-
newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
-
newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
-
newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
总结:除了newScheduledThreadPool的内部实现特殊一点之外,其它线程池内部都是基于 ThreadPoolExecutor 类(Executor的子类)实现的。
实现:
public class TestUtil {
public static void main(String[] args) {
ScheduledExecutorService scheduExec = Executors.newScheduledThreadPool(10);
scheduExec.schedule(new Runnable() {
@SuppressWarnings("static-access")
@Override
public void run() {
System.out.println("20秒后处理");
}
}, 20, TimeUnit.SECONDS);
}
}
周期性定时任务20秒后执行
以上是关于线程池工具类几种实现的主要内容,如果未能解决你的问题,请参考以下文章
java多线程-Executors实现的几种线程池以及Callable
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段