手写一个线程池
Posted wen-pan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写一个线程池相关的知识,希望对你有一定的参考价值。
实现了基本的线程池功能,比如
- 提交不需要返回结果的任务
- 提交需要返回值的任务
- 自定义线程池各个参数
- 线程池中的worker执行完任务后自动去队列里获取一个任务来执行
- 自定义拒绝策略
需要优化的功能
- 更精细的并发控制
- 根据线程池中worker的具体情况来判断是否要销毁当前worker
- 当然,最正宗的还是要看 大哥李 开发的线程池
①、自定义线程池
public class MyThreadPool
/**
* 线程池状态(默认0,正在运行,1线程池关闭中,2线程池终结)
*/
private int state;
/**
* 阻塞队列最大容量
*/
private int queueCapacity = 1024;
/**
* 核心线程数量
*/
private int corePoolSize = 1;
/**
* 最大线程数量
*/
private int maxPoolSize = Integer.MAX_VALUE;
/**
* 阻塞队列(里面的泛型应该是Runnable)
*/
private BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
/**
* 自定义线程池拒绝策略(这里默认给一个拒绝策略)
*/
private MyRejectedExecutionHandler rejectedExecutionHandler = new DefaultRejectedExecutionHandler();
/**
* 互斥锁
*/
private final Object mutexLock = new Object();
/**
* 当前线程池中工作线程集合
*/
private final List<Worker> workers = new ArrayList<>();
/**
* 无参构造,所有的线程池参数都使用默认值
*/
public MyThreadPool()
state = 0;
/**
* 自定义线程池参数
*/
public MyThreadPool(int queueCapacity, int corePoolSize, int maxPoolSize, MyRejectedExecutionHandler rejectedExecutionHandler)
state = 0;
this.queueCapacity = queueCapacity;
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.rejectedExecutionHandler = rejectedExecutionHandler;
/**
* 带返回值的任务提交
*/
public <T> Future<T> submit(Callable<T> task)
if (task == null)
throw new NullPointerException();
// 这里就不直接写了,模仿JDK的实现方式,直接使用RunnableFuture来接收返回值
// 在RunnableFuture的run方法里,会将线程的执行结果或异常设置到outcome上
RunnableFuture<T> future = newTaskFor(task);
// 任然直接调用execute来执行任务
execute(future);
return future;
/**
* 将callable包装为一个FutureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
return new FutureTask<T>(callable);
/**
* 执行任务的方法,不需要返回值
*/
public void execute(Runnable task)
if (task == null)
throw new NullPointerException();
// 提交任务时加互斥锁
synchronized (mutexLock)
// 先判断线程池状态
if (state == 0)
Worker worker = new Worker(task);
// 当前线程池中运行的线程数量小于核心线程数
if (workers.size() < corePoolSize)
// 将worker添加到workers集合
workers.add(worker);
// 开启线程运行
worker.start();
else
// 阻塞队列已满
if (blockingQueue.size() >= queueCapacity)
// 判断是否达到最大线程数量,如果没有达到则创建新的worker来处理任务,反之则拒绝
if (workers.size() < maxPoolSize)
workers.add(worker);
// 开启线程执行
worker.start();
else
// 触发拒绝策略
rejectedExecutionHandler.rejectedExecution(task, this);
else
// 阻塞队列没满,则直接加入到队列
blockingQueue.add(worker);
else if (state == 1)
// 线程池处于关闭中,直接抛出异常(暂时不作处理)
throw new RuntimeException("线程池正在关闭中,不可提交新任务");
else
// 线程池已终止(暂时不作处理)
throw new RuntimeException("线程池已经终止,不可提交新任务");
/**
* 关闭线程池
*/
public void shutdown()
// 其他操作省略
state = 1;
/**
* 立即关闭线程池(这里需要用到cas的方式来修改状态)
*/
public void shutdownNow()
// 其他操作省略
state = 2;
/**
* 从阻塞队列获取一个任务(这里如果发生InterruptedException异常,那么默认返回一个null)
*/
private Runnable getTask()
try
// 这里使用阻塞式获取任务
return blockingQueue.poll(100, TimeUnit.MILLISECONDS);
catch (InterruptedException e)
return null;
/**
* 每个worker对象就是一个线程对象
*/
class Worker extends Thread
/**
* 任务
*/
private Runnable task;
public Worker(Runnable task)
this.task = task;
@Override
public void run()
runWorker();
void runWorker()
try
// 当前任务执行完毕后,从阻塞队列里获取一个任务来执行
while (task != null || (task = getTask()) != null)
// 运行任务
try
task.run();
finally
// 每执行完一个任务就将task置为空
task = null;
finally
// worker 退出时的处理逻辑
processWorkerExit(this);
/**
* worker退出时的逻辑
*/
private void processWorkerExit(Worker w)
// 这里应该判断当前线程池中的worker是否超过coreSize,如果超过则做销毁多余的worker,这里为了简单就都不做了
// 将worker移除
workers.remove(w);
②、自定义拒绝策略
public interface MyRejectedExecutionHandler
/**
* 拒绝策略
*
* @param r 任务
* @param executor 线程池
* @author Mr_wenpan@163.com 2022/3/13 12:00 下午
*/
void rejectedExecution(Runnable r, MyThreadPool executor);
默认拒绝策略实现
public class DefaultRejectedExecutionHandler implements MyRejectedExecutionHandler
@Override
public void rejectedExecution(Runnable r, MyThreadPool executor)
throw new RuntimeException("thread pool is full, reject task.");
③、测试类
public class MyThreadPoolTest
private static final Logger logger = LoggerFactory.getLogger("MyThreadPoolTest");
public static void main(String[] args) throws InterruptedException
// 创建自定义线程池
MyThreadPool myThreadPool = new MyThreadPool(100, 5, 20, new DefaultRejectedExecutionHandler());
// 提交10个不需要返回值的任务到线程池中
for (int i = 0; i < 10; i++)
int temp = i;
myThreadPool.execute(() ->
logger.info("我是第 个提交的任务", temp);
);
List<Future<String>> futures = new ArrayList<>();
// 提交10个能获取返回值的任务到线程池中
for (int i = 0; i < 10; i++)
int temp = 10 + i;
Future<String> future = myThreadPool.submit(() ->
return "我的任务编号是:" + temp;
);
futures.add(future);
for (Future<String> future : futures)
try
logger.info(future.get());
catch (Exception e)
e.printStackTrace();
TimeUnit.SECONDS.sleep(5);
TimeUnit.SECONDS.sleep(1000);
④、测试结果
16:12:53.725 [Thread-2] INFO MyThreadPoolTest - 我是第 2 个提交的任务
16:12:53.725 [Thread-3] INFO MyThreadPoolTest - 我是第 3 个提交的任务
16:12:53.725 [Thread-1] INFO MyThreadPoolTest - 我是第 1 个提交的任务
16:12:53.725 [Thread-4] INFO MyThreadPoolTest - 我是第 4 个提交的任务
16:12:53.725 [Thread-0] INFO MyThreadPoolTest - 我是第 0 个提交的任务
16:12:53.731 [Thread-2] INFO MyThreadPoolTest - 我是第 5 个提交的任务
16:12:53.731 [Thread-3] INFO MyThreadPoolTest - 我是第 6 个提交的任务
16:12:53.732 [Thread-1] INFO MyThreadPoolTest - 我是第 7 个提交的任务
16:12:53.732 [Thread-0] INFO MyThreadPoolTest - 我是第 9 个提交的任务
16:12:53.732 [Thread-4] INFO MyThreadPoolTest - 我是第 8 个提交的任务
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:10
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:11
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:12
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:13
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:14
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:15
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:16
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:17
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:18
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:19
以上是关于手写一个线程池的主要内容,如果未能解决你的问题,请参考以下文章