(十八)Worker线程管理

Posted 醋酸菌HaC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(十八)Worker线程管理相关的知识,希望对你有一定的参考价值。

转自:美团技术团队

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。

这也是线程回收的核心

我们来看一下它的部分代码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null

Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker执行任务的模型如下图所示:

线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。

Worker是通过继承AQS,使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock,而是使用AQS,为的就是实现不可重入的特性去反应线程现在的执行状态。

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
  2. 如果正在执行任务,则不应该中断线程。
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;如果线程是空闲状态则可以安全回收。

在线程回收过程中就使用到了这种特性,回收过程如下图所示:

2.4.2 Worker线程增加

增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,其执行流程如下图所示:

2.4.3 Worker线程回收

线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。

try 
  while (task != null || (task = getTask()) != null) 
    //执行任务
  
 finally 
  processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己

线程回收的工作是在processWorkerExit方法完成的。

事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

2.4.4 Worker线程执行任务

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask()结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

执行流程如下图所示:

ThreadPoolExecutor源码分析

一个线程池可以接受任务类型有Runnable和Callable,分别对应了execute和submit方法。

来看看execute的方法:

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
 
        int c = ctl.get();
        //第一步:如果线程数量小于核心线程数  
       // workerCountOf 是 获取活动线程数  
        if (workerCountOf(c) < corePoolSize) 
            //则启动一个核心线程执行任务  
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
    //第二步:当前线程数量大于等于核心线程数,加入任务队列,成功的话会进行二次检查  
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
               //启动非核心线程执行,注意这里任务是null,其实里面会去取任务队列里的任务执行  
                addWorker(null, false);
        
    //第三步:加入不了队列(即队列满了),尝试启动非核心线程
        else if (!addWorker(command, false))
            //如果启动不了非核心线程执行,说明到达了最大线程数量的限制,拒绝
            reject(command);
    

所以还是这张图:

线程池是如何重复利用空闲的线程来执行任务的?

上面源码分析到,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。

也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子,那么我们继续来看看addWorker方法。

private boolean addWorker(Runnable firstTask, boolean core)   
        retry:  
        for (;;)   
            int c = ctl.get();  
            int rs = runStateOf(c);  
   
            // Check if queue empty only if necessary.  
            if (rs >= SHUTDOWN &&  
                ! (rs == SHUTDOWN &&  
                   firstTask == null &&  
                   ! workQueue.isEmpty()))  
                return false;  
   
            for (;;)   
                int wc = workerCountOf(c);  
                if (wc >= CAPACITY ||  
                    wc >= (core ? corePoolSize : maximumPoolSize))  
                    return false;  
                if (compareAndIncrementWorkerCount(c))  
                    break retry;  
                c = ctl.get();  // Re-read ctl  
                if (runStateOf(c) != rs)  
                    continue retry;  
                // else CAS failed due to workerCount change; retry inner loop  
              
          
        //前面都是线程池状态的判断,暂时不理会,主要看下面两个关键的地方  
        boolean workerStarted = false;  
        boolean workerAdded = false;  
        Worker w = null;  
        try   
            //第一处 最重要的是这里
            w = new Worker(firstTask); // 新建一个Worker对象,这个对象包含了待执行的任务,并且新建一个线程  
            final Thread t = w.thread;  
            if (t != null)   
                final ReentrantLock mainLock = this.mainLock;  
                mainLock.lock();  
                try   
                    int rs = runStateOf(ctl.get());  
   
                    if (rs < SHUTDOWN ||  
                        (rs == SHUTDOWN && firstTask == null))   
                        if (t.isAlive()) // precheck that t is startable  
                            throw new IllegalThreadStateException();  
                        workers.add(w);  
                        int s = workers.size();  
                        if (s > largestPoolSize)  
                            largestPoolSize = s;  
                        workerAdded = true;  
                      
                 finally   
                    mainLock.unlock();  
                  
                if (workerAdded)   
                    //第二处
                    t.start(); // 启动刚创建的worker对象里面的thread执行  
                    workerStarted = true;  
                  
              
         finally   
            if (! workerStarted)  
                addWorkerFailed(w);  
          
        return workerStarted;  
      

上面第一处,会new一个Worker,Worker的构造方法:

Worker(Runnable firstTask)  // worker本身实现了Runnable接口  
            setState(-1); // inhibit interrupts until runWorker  
            this.firstTask = firstTask; // 持有外部传进来的runnable任务  
            //创建了一个thread对象,并把自身这个runnable对象给了thread,一旦该thread执行start方法,就会执行worker的run方法  
            this.thread = getThreadFactory().newThread(this);   
          

上面第二处,Worker实现了Runnable接口,所以t.start会去执行worker的run方法:

        public void run() 
            runWorker(this);
        

run方法又执行了ThreadPoolExecutor的runWorker方法,把当前worker对象传入:

final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
            while (task != null || (task = getTask()) != null) 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        task.run();
                     catch (RuntimeException x) 
                        thrown = x; throw x;
                     catch (Error x) 
                        thrown = x; throw x;
                     catch (Throwable x) 
                        thrown = x; throw new Error(x);
                     finally 
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                
            
            completedAbruptly = false;
         finally 
            processWorkerExit(w, completedAbruptly);
        
    

来看一下getTask的源码:

private Runnable getTask()   
        boolean timedOut = false; // Did the last poll() time out?  
   
        for (;;)   
            int c = ctl.get();  
            int rs = runStateOf(c);  
   
            // Check if queue empty only if necessary.  
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))   
                decrementWorkerCount();  
                return null;  
              
   
            int wc = workerCountOf(c);  
   
            // timed变量用于判断是否需要进行超时控制。  
            // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;  
            // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;  
            // 对于超过核心线程数量的这些线程或者允许核心线程进行超时控制的时候,需要进行超时控制  
            // Are workers subject to culling?  
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
   
            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时(timedOut开始为false,后面的循环末尾超时时会置为true)  
            // 或者当前线程数量已经超过了最大线程数量,那么尝试将workerCount减1,即当前活动线程数减1,  
            if ((wc > maximumPoolSize || (timed && timedOut))  
                && (wc > 1 || workQueue.isEmpty()))   
                // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了  
                if (compareAndDecrementWorkerCount(c))  
                    return null;  
                continue;  
              
   
            try   
                // 注意workQueue中的poll()方法与take()方法的区别  
                //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null  
                //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止  
                Runnable r = timed ?  
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                    workQueue.take();  
                if (r != null)  
                    return r;  
                timedOut = true; // 能走到这里说明已经超时了  
             catch (InterruptedException retry)   
                timedOut = false;  
              
          
      

方法比较长,归纳起来就三步:

  1. 从worker中取出runnable;
  2. 进入while循环判断,判断当前worker中的runnable,或者通过getTask得到的runnable是否为空,不为空的情况下,就执行run;
  3. 执行完成把runnable任务置为null。

复用线程 的核心就是gettask方法:

如果当前活动线程数小于等于核心线程数(或者不允许核心线程超时),同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态(队列的take方法),直到能取出任务为止(也就是队列中被新添加了任务时),因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。

这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

综上所述,线程之所以能达到复用,就是在当前线程执行的runWorker方法中有个while循环,while循环的第一个判断条件是执行当前线程关联的Worker对象中的任务,执行一轮后进入while循环的第二个判断条件getTask(),从任务队列中取任务,取这个任务的过程要么是一直阻塞的,要么是阻塞一定时间直到超时才结束的,超时到了的时候这个线程也就走到了生命的尽头。

我们用个例子来看下:

假设我们有这么一个ThreadPoolExecutor,核心线程数设置为5(不允许核心线程超时),最大线程数设置为10,超时时间为20s,线程队列是LinkedBlockingDeque(相当于是个无界队列)。

当我们给这个线程池陆续添加任务,前5个任务执行的时候,会执行到我们之前分析的execute方法的第一步部分,会陆续创建5个线程做为核心线程执行任务,当前线程里面的5个关联的任务执行完成后,会进入各自的while循环的第二个判断getTask中去取队列中的任务,假设当前没有新的任务过来也就是没有执行execute方法,那么这5个线程就会在workQueue.take()处一直阻塞的。这个时候,我们执行execute加入一个任务,即第6个任务,这个时候会进入execute的第二部分,将任务加入到队列中,一旦加入队列,之前阻塞的5个线程其中一个就会被唤醒取出新加入的任务执行了。

所以总结就是:

复用机制跟线程池的阻塞队列有很大关系,我们可以看到,在execute在核心线程满了,但是队列不满的时候会把任务加入到队列中,一旦加入成功,之前被阻塞的线程就会被唤醒去执行新的任务,这样就不会重新创建线程了。

当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。

本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。

以上是关于(十八)Worker线程管理的主要内容,如果未能解决你的问题,请参考以下文章

(十八)Worker线程管理

面渣逆袭:线程池夺命连环十八问

面渣逆袭:线程池夺命连环十八问

面渣逆袭:线程池夺命连环十八问

swoole架构分析

线程池动态自适应变化