Java线程池:ThreadPoolExecutor

Posted 安然_随心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程池:ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

以下基于JDK 1.8 进行分析

文章目录

1 简介

实现了AbstractExecutorService 抽象类,提供了创建线程池、设置线程池属性、提交任务、停止线程池等基本操作接口。下面操作类型对 ThreadPoolExecutor 进行分析。

2. 创建线程池

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

创建线程池对象,可根据实际需要,自定义线程池特性。初始化过程,没有任何特殊化操作,只是一些基础变量的初始化。

创建完成后,也没有什么特殊的效果,刚初始化的线程池对象中不包含任何线程。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    

备注:如果觉得使用该自定义方式创建线程池比较复杂繁琐(需要传入众多的参数),则可以使用 Executors 工具类来创建固定属性的线程池,包括:

  • newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  • newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

  • newScheduledThreadPool :创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。

  • newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

3. 线程池运行

在创建线程池后,线程池就启动了(没有类似的start 函数,创建即启动)。但是里面没有任何线程。启动线程运行任务有两种方式:

3.1 预启动核心线程

3.1.1 prestartCoreThread

public boolean prestartCoreThread()public boolean prestartCoreThread() 
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    

如果当前线程池中的数量 小于 核心线程数,则创建一个线程,加入到线程池中,然后调用线程的start 方法,启动线程。

3.1.2 prestartAllCoreThreads

启动所有的核心线程。循环调用prestartCoreThread,直到
prestartCoreThread 调用不成功(已经达到核心线程数 或者 线程池已经关闭了)。

3.2 提交任务的方式

提交任务 和 预启动的区别就是:预启动 核心线程的时候,由于没有任务,则核心线程是处于空转的状态。并且,预启动只能启动核心线程数,当到达 核心线程数,则是没有作用的。

3.2.1 execute

    public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
         // 如果当前线程数 小于 核心线程数,则启动一个新的线程,运行该任务
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            //如果线程池在关闭,则将任务从任务队列中删除,并执行拒绝任务流程;
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //再次判定,在这个时候,是否之前的线程死了,则新启动一个线程(避免有任务,无工作线程的情况)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
            //其他case: 线程池处于运行状态,且当前线程数大于核心线程数,且任务队列没有满,
                       //则将任务加入到任务队列中
        
        //线程池处于运行状态,且当前线程数大于核心线程数,且任务队列满了,则尝试新启动一个线程
        //如果启动失败,则拒绝任务
        else if (!addWorker(command, false))
            reject(command);
    

3.2.2 submit 系列方法

一般形式的submit 方法如下:

    public Future<?> submit(Runnable task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    

实际上,submit 方法可接收 runnable, 也可接收 callable 对象。

submit 系列方法核心 要点还是调用了execute ,只是封装了一层,可以返回future 对象,可查询任务状态、结果。

3.2.3 其他说明

创建先的工作线程可能失败的case:

  • 达到线程池限制(预启动时,是到达 核心线程数;其他情况,达到最大线程数限制);
  • 线程池已经停止了了,或者在 缓慢 shutdown 过程中。
  • 设置的线程池工厂 创建线程错误,例如系统内存溢出等情况;

3.3 工作线程

工作线程的定义:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
 

final Thread thread;
 Runnable firstTask;
 
。。。。

3.3.1 工作线程任务执行过程

工作线程继承了runnable 接口,直接看接口的run 函数:

 public void run() 
            runWorker(this);
        


    final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
        	//如果初始任务不为空 或者能取到任务(任务队列不为空)
            while (task != null || (task = getTask()) != null) 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                	//默认实现是空的,可自定义为记录日志等其他功能
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        //调用任务的run函数,执行任务
                        task.run();
                     catch (RuntimeException x) 
                        thrown = x; throw x;
                     catch (Error x) 
                        thrown = x; throw x;
                     catch (Throwable x) 
                        thrown = x; throw new Error(x);
                     finally 
                    	//空函数
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                
            
            completedAbruptly = false;
         finally 
        	//退出了主循环,该线程结束。会进行任务统计,且如果线程数达不到要求,会重新启动一个工作线程。
            processWorkerExit(w, completedAbruptly);
        
    

线程池中的任务由工作线程完成。工作线程的运行逻辑:

  • 运行初始任务(如果有的话。使用预启动函数创建的工作线程没有初始任务);
  • 不停的从任务队列中取任务,调用任务的run函数,执行任务;

3.3.2 工作线程什么时候退出、运行结束

从上面代码中可以看到,工作线程一般情况下,是不会退出的,退出工作循环只有一个条件:执行抛出了异常。抛出异常又可分为两种情况:

  • 当线程池关闭时,任务会收到中断 异常;
  • 本身任务抛出异常;

4 结束线程池

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

4.1 shutdown()

不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  public void shutdown() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            checkShutdownAccess();
            //设置线程池的状态为关闭,则不会接受新的任务
            advanceRunState(SHUTDOWN);
            //中断那些空闲的进程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
         finally 
            mainLock.unlock();
        
        tryTerminate();
    

4.2 shutdownNow()

立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

5. 问题

5.1 工作线程

线程池中的工作线程 被放入到 一个哈HashSet中。当创建新的线程或者线程死亡时,通过锁 来修改 workers 集合。

private final HashSet<Worker> workers = new HashSet<Worker>();

 private final ReentrantLock mainLock = new ReentrantLock();

那为什么不直接使用线程安全的集合?

 * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.

没有看懂文档上的说明

5.2 在停止线程池时,怎么区分那些工作线程是空闲的?

所谓忙碌的工作线程,即取到任务(从任务队列获取到任务 或者 有初始任务),准备执行的工作线程。

当工作线程有了任务后,会给自己上锁:

 final void runWorker(Worker w) 
 .......
 while (task != null || (task = getTask()) != null) 
             // 上锁
                w.lock();
               。。。。。
                

worker 继承了AbstractQueuedSynchronizer,当执行worker.lock 后,如果不执行unlock ,则其他地方是获取不到锁的。当worker 执行完取到的单个任务后,会调用 unlock 函数。

当调用shutdown 函数停止 空闲工作线程的时候,会尝试调用worker 的 trylock 函数。如上所说,如果工作线程取到了任务,则会进行lock 操作,即此时 shutdown 中,对该工作程序执行trylock 不成功,则不会发送interrupt 消息。

for (Worker w : workers) 
                Thread t = w.thread;
                // 执在执行任务的工作线程 trylock或失败
                if (!t.isInterrupted() && w.tryLock()) 
                    try 
                        t.interrupt();
                     catch (SecurityException ignore) 
                     finally 
                        w.unlock();
                    
                
                if (onlyOne)
                    break;
            

以上是关于Java线程池:ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

线程池bing

线程池bing

Android线程管理之ThreadPoolExecutor自定义线程池

python线程池ThreadPoolExecutor与进程池ProcessPoolExecutor

面试官一个线程池问题把我问懵逼了。

复习多线程相关知识