手写一个线程池

Posted wen-pan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手写一个线程池相关的知识,希望对你有一定的参考价值。

实现了基本的线程池功能,比如

  • 提交不需要返回结果的任务
  • 提交需要返回值的任务
  • 自定义线程池各个参数
  • 线程池中的worker执行完任务后自动去队列里获取一个任务来执行
  • 自定义拒绝策略

需要优化的功能

  • 更精细的并发控制
  • 根据线程池中worker的具体情况来判断是否要销毁当前worker
  • 当然,最正宗的还是要看 大哥李 开发的线程池

①、自定义线程池

public class MyThreadPool 

    /**
     * 线程池状态(默认0,正在运行,1线程池关闭中,2线程池终结)
     */
    private int state;
    /**
     * 阻塞队列最大容量
     */
    private int queueCapacity = 1024;
    /**
     * 核心线程数量
     */
    private int corePoolSize = 1;
    /**
     * 最大线程数量
     */
    private int maxPoolSize = Integer.MAX_VALUE;
    /**
     * 阻塞队列(里面的泛型应该是Runnable)
     */
    private BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();

    /**
     * 自定义线程池拒绝策略(这里默认给一个拒绝策略)
     */
    private MyRejectedExecutionHandler rejectedExecutionHandler = new DefaultRejectedExecutionHandler();
    /**
     * 互斥锁
     */
    private final Object mutexLock = new Object();
    /**
     * 当前线程池中工作线程集合
     */
    private final List<Worker> workers = new ArrayList<>();

    /**
     * 无参构造,所有的线程池参数都使用默认值
     */
    public MyThreadPool() 
        state = 0;
    

    /**
     * 自定义线程池参数
     */
    public MyThreadPool(int queueCapacity, int corePoolSize, int maxPoolSize, MyRejectedExecutionHandler rejectedExecutionHandler) 
        state = 0;
        this.queueCapacity = queueCapacity;
        this.corePoolSize = corePoolSize;
        this.maxPoolSize = maxPoolSize;
        this.rejectedExecutionHandler = rejectedExecutionHandler;
    

    /**
     * 带返回值的任务提交
     */
    public <T> Future<T> submit(Callable<T> task) 
        if (task == null) 
            throw new NullPointerException();
        
        // 这里就不直接写了,模仿JDK的实现方式,直接使用RunnableFuture来接收返回值
        // 在RunnableFuture的run方法里,会将线程的执行结果或异常设置到outcome上
        RunnableFuture<T> future = newTaskFor(task);
        // 任然直接调用execute来执行任务
        execute(future);
        return future;
    

    /**
     * 将callable包装为一个FutureTask
     */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) 
        return new FutureTask<T>(callable);
    

    /**
     * 执行任务的方法,不需要返回值
     */
    public void execute(Runnable task) 
        if (task == null) 
            throw new NullPointerException();
        
        // 提交任务时加互斥锁
        synchronized (mutexLock) 
            // 先判断线程池状态
            if (state == 0) 
                Worker worker = new Worker(task);
                // 当前线程池中运行的线程数量小于核心线程数
                if (workers.size() < corePoolSize) 
                    // 将worker添加到workers集合
                    workers.add(worker);
                    // 开启线程运行
                    worker.start();
                 else 
                    // 阻塞队列已满
                    if (blockingQueue.size() >= queueCapacity) 
                        // 判断是否达到最大线程数量,如果没有达到则创建新的worker来处理任务,反之则拒绝
                        if (workers.size() < maxPoolSize) 
                            workers.add(worker);
                            // 开启线程执行
                            worker.start();
                         else 
                            // 触发拒绝策略
                            rejectedExecutionHandler.rejectedExecution(task, this);
                        
                     else 
                        // 阻塞队列没满,则直接加入到队列
                        blockingQueue.add(worker);
                    
                
             else if (state == 1) 
                // 线程池处于关闭中,直接抛出异常(暂时不作处理)
                throw new RuntimeException("线程池正在关闭中,不可提交新任务");
             else 
                // 线程池已终止(暂时不作处理)
                throw new RuntimeException("线程池已经终止,不可提交新任务");
            
        
    

    /**
     * 关闭线程池
     */
    public void shutdown() 
        // 其他操作省略
        state = 1;
    

    /**
     * 立即关闭线程池(这里需要用到cas的方式来修改状态)
     */
    public void shutdownNow() 
        // 其他操作省略
        state = 2;
    

    /**
     * 从阻塞队列获取一个任务(这里如果发生InterruptedException异常,那么默认返回一个null)
     */
    private Runnable getTask() 
        try 
            // 这里使用阻塞式获取任务
            return blockingQueue.poll(100, TimeUnit.MILLISECONDS);
         catch (InterruptedException e) 
            return null;
        
    

    /**
     * 每个worker对象就是一个线程对象
     */
    class Worker extends Thread 
        /**
         * 任务
         */
        private Runnable task;

        public Worker(Runnable task) 
            this.task = task;
        

        @Override
        public void run() 
            runWorker();
        

        void runWorker() 
            try 
                // 当前任务执行完毕后,从阻塞队列里获取一个任务来执行
                while (task != null || (task = getTask()) != null) 
                    // 运行任务
                    try 
                        task.run();
                     finally 
                        // 每执行完一个任务就将task置为空
                        task = null;
                    
                
             finally 
                // worker 退出时的处理逻辑
                processWorkerExit(this);
            
        

        /**
         * worker退出时的逻辑
         */
        private void processWorkerExit(Worker w) 
            // 这里应该判断当前线程池中的worker是否超过coreSize,如果超过则做销毁多余的worker,这里为了简单就都不做了
            // 将worker移除
            workers.remove(w);
        
    


②、自定义拒绝策略

public interface MyRejectedExecutionHandler 

    /**
     * 拒绝策略
     *
     * @param r        任务
     * @param executor 线程池
     * @author Mr_wenpan@163.com 2022/3/13 12:00 下午
     */
    void rejectedExecution(Runnable r, MyThreadPool executor);

默认拒绝策略实现

public class DefaultRejectedExecutionHandler implements MyRejectedExecutionHandler 

    @Override
    public void rejectedExecution(Runnable r, MyThreadPool executor) 
        throw new RuntimeException("thread pool is full, reject task.");
    


③、测试类

public class MyThreadPoolTest 

    private static final Logger logger = LoggerFactory.getLogger("MyThreadPoolTest");

    public static void main(String[] args) throws InterruptedException 

        // 创建自定义线程池
        MyThreadPool myThreadPool = new MyThreadPool(100, 5, 20, new DefaultRejectedExecutionHandler());

        // 提交10个不需要返回值的任务到线程池中
        for (int i = 0; i < 10; i++) 
            int temp = i;
            myThreadPool.execute(() -> 
                logger.info("我是第  个提交的任务", temp);
            );
        

        List<Future<String>> futures = new ArrayList<>();
        // 提交10个能获取返回值的任务到线程池中
        for (int i = 0; i < 10; i++) 
            int temp = 10 + i;
            Future<String> future = myThreadPool.submit(() -> 
                return "我的任务编号是:" + temp;
            );
            futures.add(future);
        

        for (Future<String> future : futures) 
            try 
                logger.info(future.get());
             catch (Exception e) 
                e.printStackTrace();
            
        

        TimeUnit.SECONDS.sleep(5);

        TimeUnit.SECONDS.sleep(1000);
    

④、测试结果

16:12:53.725 [Thread-2] INFO MyThreadPoolTest - 我是第 2 个提交的任务
16:12:53.725 [Thread-3] INFO MyThreadPoolTest - 我是第 3 个提交的任务
16:12:53.725 [Thread-1] INFO MyThreadPoolTest - 我是第 1 个提交的任务
16:12:53.725 [Thread-4] INFO MyThreadPoolTest - 我是第 4 个提交的任务
16:12:53.725 [Thread-0] INFO MyThreadPoolTest - 我是第 0 个提交的任务
16:12:53.731 [Thread-2] INFO MyThreadPoolTest - 我是第 5 个提交的任务
16:12:53.731 [Thread-3] INFO MyThreadPoolTest - 我是第 6 个提交的任务
16:12:53.732 [Thread-1] INFO MyThreadPoolTest - 我是第 7 个提交的任务
16:12:53.732 [Thread-0] INFO MyThreadPoolTest - 我是第 9 个提交的任务
16:12:53.732 [Thread-4] INFO MyThreadPoolTest - 我是第 8 个提交的任务
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:10
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:11
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:12
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:13
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:14
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:15
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:16
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:17
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:18
16:12:53.748 [main] INFO MyThreadPoolTest - 我的任务编号是:19

以上是关于手写一个线程池的主要内容,如果未能解决你的问题,请参考以下文章

手写一个线程池

手写线程池

手写线程池实战

网易一面:如何设计线程池?请手写一个简单线程池?

图解线程池工作机制,手写线程池?

手写线程池 - C语言版