深入浅出线:程池的线程回收--回收的是非核心线程吗?

Posted 小猪快跑22

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出线:程池的线程回收--回收的是非核心线程吗?相关的知识,希望对你有一定的参考价值。

写这篇文章的初衷是在和同事讨论线程池中线程去等待队列里面去取任务是随机的还是有序的,什么意思呢?举个例子,我自定义了下面一个线程池,如下:

ExecutorService executorService =
                    ThreadPoolExecutor(
                        2,
                        3,
                        60_000,
                        TimeUnit.MILLISECONDS,
                        LinkedBlockingQueue(2),
                        ZjtThreadFactory()
                    )

即核心池的大小是2,最大线程数是3,等待队列的大小是2,非核心线程存活的时间是60秒

假设我现在向线程池中提交5个任务,每个任务耗时1s,那么按照线程池的原理,肯定是先创建2个线程(线程1和线程2)分别执行任务1和任务2,然后任务3和任务4放到等待队列中,然后再创建一个线程执行任务5,等1s任务结束后,线程1和线程2去等待队列中去取任务3和任务4执行。再然后就是任务执行完了,线程1和线程2和线程3都在等待任务,等待60秒之后会回收线程3

注意本例中是回收线程3并不是因为线程3是非核心线程,而是线程1和线程2执行完任务1和任务2后又去队列中取任务3和任务4执行了,线程3最先等待,所以会先回收线程3。

测试代码如下:

executorService =
    ThreadPoolExecutor(
        2,
        3,
        60_000,
        TimeUnit.MILLISECONDS,
        LinkedBlockingQueue(2),
        ZjtThreadFactory()
    )
for (i in 1..5) 
    executorService.execute 
        Log.e("zzzzz", "$Thread.currentThread().name >> 任务 $i 开始执行")
        Thread.sleep(1000)
        Log.e("zzzzz", "$Thread.currentThread().name >> 任务 $i 执行结束")
    

 

看下结果是不是这样啊:

2022-01-07 10:45:13.806 30299-31786/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 1 开始执行
2022-01-07 10:45:13.806 30299-31787/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 2 开始执行
2022-01-07 10:45:13.807 30299-31788/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 5 开始执行
2022-01-07 10:45:14.807 30299-31786/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 1 执行结束
2022-01-07 10:45:14.807 30299-31787/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 2 执行结束
2022-01-07 10:45:14.807 30299-31786/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 3 开始执行
2022-01-07 10:45:14.807 30299-31788/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 5 执行结束
2022-01-07 10:45:14.807 30299-31787/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 4 开始执行
2022-01-07 10:45:15.808 30299-31786/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 3 执行结束
2022-01-07 10:45:15.809 30299-31787/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 4 执行结束

从 log 中可以看出结果和我们上面分析的一样。

  1. 首先创建线程1 和 线程2 去执行任务1 和 任务2
  2. 任务3 和 任务4 放入等待队列
  3. 创建线程3去执行任务5
  4. 线程1和线程2执行完任务1和任务2后,去等待队列中去取任务3和任务4执行
  5. 线程3执行完任务,由于等待队列中没有任务了就开始等待
  6. 线程1和线程2执行完任务后也开始等待
  7. 60秒后回收线程3,因为线程3是最先等待的,而不是因为线程3是非核心线程。

问题一、先往上面的线程池抛5个任务,延时10秒后,我们开始每隔3秒往线程池中抛一个任务,那么线程3最后会被回收吗?

条件是:10秒后,线程1,2,3都执行完任务都在等待了,线程3最先等待,然后是线程1和2。

那么问题来了:该选择哪个线程来执行这个任务呢?是随机选一个吗?

先看测试结果。
加上下面的测试代码如下:

Thread.sleep(10_000)
Thread
    for(i in 6..10) 
        Thread.sleep(3000)
        executorService.execute 
            Log.e("zzzzz", "$Thread.currentThread().name >> 任务 $i 开始执行")
            Thread.sleep(1000)
            Log.e("zzzzz", "$Thread.currentThread().name >> 任务 $i 执行结束")
        
    
.start()

看下结果:

2022-01-07 11:52:21.263 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 1 开始执行
2022-01-07 11:52:21.265 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 2 开始执行
2022-01-07 11:52:21.265 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 5 开始执行
2022-01-07 11:52:22.265 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 1 执行结束
2022-01-07 11:52:22.265 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 3 开始执行
2022-01-07 11:52:22.265 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 2 执行结束
2022-01-07 11:52:22.265 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 4 开始执行
2022-01-07 11:52:22.266 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 5 执行结束
2022-01-07 11:52:23.266 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 3 执行结束
2022-01-07 11:52:23.266 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 4 执行结束

// 10秒之后,线程3 开始执行任务6,每隔3秒 开始轮训 是 线程1 线程2 再到 线程3

2022-01-07 11:52:34.269 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 6 开始执行
2022-01-07 11:52:35.270 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 6 执行结束
2022-01-07 11:52:37.270 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 7 开始执行
2022-01-07 11:52:38.271 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 7 执行结束
2022-01-07 11:52:40.271 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 8 开始执行
2022-01-07 11:52:41.273 3701-4716/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-2 >> 任务 8 执行结束
2022-01-07 11:52:43.273 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 9 开始执行
2022-01-07 11:52:44.274 3701-4717/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-3 >> 任务 9 执行结束
2022-01-07 11:52:46.274 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 10 开始执行
2022-01-07 11:52:47.275 3701-4715/com.zjt.startmodepro E/zzzzz: zjt-pool-2-thread-1 >> 任务 10 执行结束

可以看到 10 秒之后,线程3 开始执行任务6,3秒后线程1执行任务7,再3秒后 线程2执行任务8,依次轮询下去直到不再有任务插入。

即在我们的案例中,虽然线程都是空闲的,但是当任务来的时候不是随机调用的,而是轮询。那么是不是所有的都是这样轮询的呢?下面我们通过源码来分析下:

3个线程都是由于等待队列为空阻塞等待,首先是线程3getTask 获取不到任务在阻塞等待,代码如下:

 private Runnable getTask() 
        boolean timedOut = false; // Did the last poll() time out?
        for (; ; ) 
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))  // 注1
                decrementWorkerCount();
                Log.e("test thread pool", "--getTask  c = " + ctl.get());
                return null;
            
            int wc = workerCountOf(c);
            // allowCoreThreadTimeOut 默认为false,
            // 这里存活的线程数为3大于corePoolSize的2,所以time 为 true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) 
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            
           
            try 
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
             catch (InterruptedException retry) 
                timedOut = false;
            
        
    

通过上面对代码的注释,可以看到 线程1线程2 以及线程3通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 进行阻塞等待的,我这里的队列用的是 LinkedBlockingQueuepoll 方法代码如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException 
    E x = null;
    int c = -1;
    long nanos = unit.toNanos(timeout);
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try 
        while (count.get() == 0) 
            if (nanos <= 0L)
                return null;
            // 发现队列为空,通过 ReentrantLock 的 Condition 来实现阻塞等待线程存活时间
            nanos = notEmpty.awaitNanos(nanos);
        
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
     finally 
        takeLock.unlock();
    
    if (c == capacity)
        signalNotFull();
    return x;

notEmpty.awaitNanos(nanos) 的部分代码如下:

public final long awaitNanos(long nanosTimeout)
        throws InterruptedException 
    。。。。。。
    // 这里是会把 这个线程作为一个结点插入到等待队列,是队列当然是有序的
   // 注意,这里的等待队列不是线程池的等待队列,是AQS中的等待队列,可以看我关于AQS的文章分析
    Node node = addConditionWaiter();
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    。。。。。。
    long remaining = deadline - System.nanoTime(); // avoid overflow
    return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;

关于 ReentrantLock 以及 Condition 的源码分析可参考我的另一篇文章:
ReentrantLock 以及 Condition深度解析

上面分析知道,线程3 先等待 然后是线程1,最后是线程2所以AQS(AbstractQueuedLongSynchronizer) 中等待队列的顺序是:


等3秒之后往线程池里面抛一个任务后,就会唤醒 AQS中阻塞队列的第一个线程,即线程3被唤醒然后去执行任务。executorService.execute(runnable),因为当前存活的线程数为3大于核心线程的个数2,所以直接插入队列 workQueue.offer(command):

 public boolean offer(E e) 
     if (e == null) throw new NullPointerException();
     final AtomicInteger count = this.count;
     if (count.get() == capacity)
         return false;
     int c = -1;
     Node<E> node = new Node<E>(e);
     final ReentrantLock putLock = this.putLock;
     putLock.lock();
     try 
         if (count.get() < capacity) 
             // 插入线程池的阻塞队列
             enqueue(node);
             // 注意:getAndIncrement 的操作是先将值付给c,然后再加1,
             //所以,count = 1,但是 c = 0,然后执行后面signalNotEmpty
             c = count.getAndIncrement();
             if (c + 1 < capacity)
                 notFull.signal();
         
      finally 
         putLock.unlock();
     
     if (c == 0)
         signalNotEmpty();
     return c >= 0;
 

signalNotEmpty 代码如下:

private void signalNotEmpty() 
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try 
        notEmpty.signal(); // 唤醒AQS中阻塞的线程
     finally 
        takeLock.unlock();
    

signal以上是关于深入浅出线:程池的线程回收--回收的是非核心线程吗?的主要内容,如果未能解决你的问题,请参考以下文章

线程池梳理

Java技术专题「技术盲区」从源码来看看线程池是如何回收和维持运作线程的核心技术体系

五种线程池的分类和作用

深入分析线程池的实现原理

Android3.0以后,Asynctask在没开线程池的情况下会怎么排队执行

Java深入学习:线程池原理