Java并发编程--ThreadPoolExecutor

Posted 在周末

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程--ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

概述

  为什么要使用线程池?  

  合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。——摘自http://www.infoq.com/cn/articles/java-threadPool。

类图

   

使用

  线程池的监控

    可以通过线程池的以下属性监控线程池的当前状态:

      getTaskCount():线程池已经执行的和未执行的任务总数,因为统计的过程中可能会发生变化,该值是个近似值;

      getCompletedTaskCount():已完成的任务数量,是个近似值,该值小于等于TaskCount;

      getLargestPoolSize():线程池曾经的最大线程数量,可以通过该值判断线程池是否满过。如该数值等于线程池的最大大小,则表示线程池曾经满过;

      getPoolSize():线程池当前的线程数量;

      getActiveCount():线程池中活动的线程数(正在执行任务),是个近似值。

    还可以通过重写线程池提供的hook方法(beforeExecute、afterExecute和terminated)进行监控,例如监控任务的平均执行时间、最大执行时间和最小执行时间等。

    程序员可以通过重写钩子 hook 方法(如beforeExecute)实现ThreadPoolExecutor的扩展。

    扩展示例:添加了简单的暂停/恢复功能的子类

 1 class PausableThreadPoolExecutor extends ThreadPoolExecutor {
 2     private boolean isPaused;    //标志是否被暂停
 3     private ReentrantLock pauseLock = new ReentrantLock();    //访问isPaused时需要加锁,保证线程安全
 4     private Condition unpaused = pauseLock.newCondition();
 5 
 6     public PausableThreadPoolExecutor(...) { super(...); }
 7     
 8     //beforeExecute为ThreadPoolExecutor提供的hood方法
 9     protected void beforeExecute(Thread t, Runnable r) {
10         super.beforeExecute(t, r);
11         pauseLock.lock();
12         try {
13             while (isPaused) 
14                 unpaused.await();
15         } catch(InterruptedException ie) {
16             t.interrupt();
17         } finally {
18             pauseLock.unlock();
19         }
20     }
21     //暂停
22     public void pause() {
23         pauseLock.lock();
24         try {
25             isPaused = true;
26         } finally {
27             pauseLock.unlock();
28         }
29     }
30     //取消暂停
31     public void resume() {
32         pauseLock.lock();
33         try {
34             isPaused = false;
35             unpaused.signalAll();
36         } finally {
37             pauseLock.unlock();
38         }
39     }
40 }

实现原理

  ThreadPoolExecutor源码分析

    域

 1 //ctl是控制线程池状态的一个变量,包含有效的线程数(workerCount)和线程池的运行状态(runState)两部分信息。高3位表示runState,低29位表示workerCount。
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 private static final int COUNT_BITS = Integer.SIZE - 3;    //表示workerCount的位数,29位。
 4 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    //线程数的上限,(2^29)-1,大约5亿
 5 
 6 // runState is stored in the high-order bits
 7 private static final int RUNNING    = -1 << COUNT_BITS;    //能接收新任务和处理队列中的任务
 8 private static final int SHUTDOWN   =  0 << COUNT_BITS;    //不能接收新任务,但可以处理队列中的任务
 9 private static final int STOP       =  1 << COUNT_BITS;    //不能接收新任务,不能处理队列中的任务,中断正在执行的任务
10 private static final int TIDYING    =  2 << COUNT_BITS;    //所有的线程都被终止,workerCount为0时会进入该状态.
11 private static final int TERMINATED =  3 << COUNT_BITS;    //terminated()方法完成后将进入该状态。

      以上ThreadPoolExecutor的成员变量表示线程池的状态,状态信息存储在ctl变量中,ctl包含有效线程数(workerCount)和线程池运行状态(runState)两部分信息,ctl的高3位表示runState,低29位表示workerCount。ctl初始值为RUNNING状态且线程数为0。

      线程池运行状态的转换如下:

        1)线程池在RUNNING状态下调用shutdown()方法会进入到SHUTDOWN状态,(finalize()方法也会调用shutdownNow())。

        2)在RUNNING和SHUTDOWN状态下调用 shutdownNow() 方法会进入到STOP状态。

        3)在SHUTDOWN状态下,当阻塞队列为空且线程数为0时进入TIDYING状态;在STOP状态下,当线程数为0时进入TIDYING状态。

        4)在TIDYING状态,调用terminated()方法完成后进入TERMINATED状态。

 

 1 //阻塞队列
 2 private final BlockingQueue<Runnable> workQueue;
 3 //可重入锁。访问woker线程和相关记录信息时需要获取该锁
 4 private final ReentrantLock mainLock = new ReentrantLock();
 5 //包含全部worker线程集合,Accessed only under mainLock,HashSet是非线程安全的.
 6 private final HashSet<Worker> workers = new HashSet<Worker>();
 7 private final Condition termination = mainLock.newCondition();
 8 //记录最大的线程数量,Accessed only under mainLock.
 9 private int largestPoolSize;
10 //完成任务的数量,Accessed only under mainLock.
11 private long completedTaskCount;
12 
13 
14 //以下所有程序员可以控制的参数都被声明为volatile变量,保证可见性。
15 
16 //创建线程的工厂
17 private volatile ThreadFactory threadFactory;
18 //线程池饱和或关闭时的处理策略(提供了四种饱和策略)
19 private volatile RejectedExecutionHandler handler;
20 //超出corePoolSize数量的空闲线程存活时间(allowCoreThreadTimeOut=true时有效)
21 private volatile long keepAliveTime;
22 //allowCoreThreadTimeOut=false,线程不会因为空闲时间超过keepAliveTime而被停止
23 private volatile boolean allowCoreThreadTimeOut;
24 //核心线程数
25 private volatile int corePoolSize;
26 //最大线程数,此变量的最大上限为CAPACITY
27 private volatile int maximumPoolSize;

      一、线程池核心线程数和最大线程数

        ThreadPoolExecutor 将根据 corePoolSize (核心线程数)和 maximumPoolSize(最大线程数)设置的边界自动调整线程池大小。当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造函数来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

 

      二、任务队列

        workQueue是一个阻塞队列,用来存储执行的任务。所有的BlockingQueue都可用于workQueue。

          如果有效的线程数小于 corePoolSize,则线程池首选添加新线程,而不进行排队。

          如果有效的线程数大于等于 corePoolSize,则线程池首选将任务加入队列,而不添加新的线程。 

          如果队列已满,则创建新的线程,当线程数超出 maximumPoolSize 时,任务将被拒绝。

        常用的三种阻塞队列的实现:

          1)直接提交。SynchronousQueue是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。它将任务直接提交给线程而不存储任务。直接提交通常要求不限制 maximumPoolSizes 以避免拒绝新提交的任务。Executors.newCachedThreadPool使用了这个队列。

          2)无界队列。LinkedBlockingQueue是一个基于链表结构的阻塞队列,默认的大小是Integer.MAX_VALUE。创建的线程就不会超过 corePoolSize,会使maximumPoolSize 的值无效。

          3)有界队列。ArrayBlockingQueue是一个基于数组结构的有界阻塞队列。有助于防止资源耗尽,但是可能较难调整和控制。

      三、饱和策略

        当 Executor 已经关闭,或者 Executor 将有限边界用于最大线程和工作队列容量且已经饱和时,在方法 execute(Runnable) 中提交的新任务将被拒绝。线程池提供了4种饱和策略:

          1)AbortPolicy。默认的饱和策略,直接抛出RejectedExecutionException异常。

          2)CallerRunsPolicy。用调用者所在的线程来执行任务,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

          3)DiscardPolicy。直接丢弃任务。

          4)DiscardOldestPolicy。如果执行程序尚未关闭,则丢弃阻塞队列中最靠前的任务,然后重试执行新任务(如果再次失败,则重复此过程)。

        也可以使用自定义的 RejectedExecutionHandler 类,但需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

      四、threadFactory

        使用 ThreadFactory 创建新线程,默认情况下在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过自定义的 ThreadFactory创建新线程,可以改变线程的名称、线程组、优先级、守护进程状态等。

      五、workers用来存储工作线程,注意HashSet<Worker>是非线程安全的,访问时需要获取mainLock;

      六、mainLock是一个独占式可重入锁,用来保证访问workers和其他监控变量(如largestPoolSize、completedTaskCount等)的线程安全。

      七、keepAliveTime为线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。allowCoreThreadTimeout变量表示是否允许核心线程超时,如果allowCoreThreadTimeOut=false,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize;如果allowCoreThreadTimeOut=true,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=0。

    执行任务(execute)

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     /*
 5      * Proceed in 3 steps:
 6      *
 7      * 1. If fewer than corePoolSize threads are running, try to
 8      * start a new thread with the given command as its first
 9      * task.  The call to addWorker atomically checks runState and
10      * workerCount, and so prevents false alarms that would add
11      * threads when it shouldn\'t, by returning false.
12      *
13      * 2. If a task can be successfully queued, then we still need
14      * to double-check whether we should have added a thread
15      * (because existing ones died since last checking) or that
16      * the pool shut down since entry into this method. So we
17      * recheck state and if necessary roll back the enqueuing if
18      * stopped, or start a new thread if there are none.
19      *
20      * 3. If we cannot queue task, then we try to add a new
21      * thread.  If it fails, we know we are shut down or saturated
22      * and so reject the task.
23      */
24     int c = ctl.get();    //获取线程池的状态(runState和workerCount)
25     //如果线程数小于corePoolSize,新建一个线程执行该任务。
26     if (workerCountOf(c) < corePoolSize) {
27         if (addWorker(command, true))
28             return;
29         c = ctl.get();
30     }
31     //如果线程池是运行状态,并且添加任务到队列成功(队列未满)
32     if (isRunning(c) && workQueue.offer(command)) {
33         int recheck = ctl.get();
34         //再次判断线程池的运行状态,如果不是运行状态,需要从队列删除该任务。使用拒绝策略处理该任务。
35         if (! isRunning(recheck) && remove(command))
36             reject(command);
37         //如果线程数为0,执行addWorker方法。参数为null的原因是任务已经加入到队列,新建的线程从队列取任务执行即可。
38         else if (workerCountOf(recheck) == 0)
39             addWorker(null, false);
40     }
41     //线程池不是RUNNING状态或队列已满,尝试新建一个线程执行该任务。如果失败则拒绝该任务。
42     else if (!addWorker(command, false))
43         reject(command);
44 }

     新增线程(addWorker)

      线程被封装在Worker类中。

 1 //参数firstTask表示新建线程执行的第一个任务。如果firstTask为null,表示
 2 //如果参数core=true,把corePoolSize作为线程数上限的判断条件;如果为false,把maximumPoolSize作为线程数上限的判断条件
 3 private boolean addWorker(Runnable firstTask, boolean core) {
 4     retry:
 5     for (;;) {
 6         int c = ctl.get();
 7         int rs = runStateOf(c);
 8         /*
 9          * rs >= SHUTDOWN表示不再接受新任务。 
10          * 1)线程池的运行状态为SHUTDOWN;2)firstTask == null;3)阻塞队列不为空,只有这三个条件同时满足才不返回false
11          */
12         // Check if queue empty only if necessary.
13         if (rs >= SHUTDOWN &&
14             ! (rs == SHUTDOWN &&
15                firstTask == null &&
16                ! workQueue.isEmpty()))
17             return false;
18         
19         //自旋CAS递增workerCount
20         for (;;) {
21             int wc = workerCountOf(c);
22             //如果线程数超过上限,返回false。如果参数core=true,把corePoolSize作为线程数上限的判断条件;如果为false,把maximumPoolSize作为线程数上限的判断条件
23             if (wc >= CAPACITY ||
24                 wc >= (core ? corePoolSize : maximumPoolSize))
25                 return false;
26             //CAS递增线程数。如果成功,跳出最外层循环;如果失败,且运行状态没有改变,继续内层循环直到成功。
27             if (compareAndIncrementWorkerCount(c))
28                 break retry;
29             //判断runState是否改变,如果改变则继续外层循环
30             c = ctl.get();  // Re-read ctl
31             if (runStateOf(c) != rs)
32                 continue retry;
33             // else CAS failed due to workerCount change; retry inner loop
34         }
35     }
36     
37     //走到这说明需要新建线程,且workerCount更新成功
38     //下面是新建Worker的过程。
39     boolean workerStarted = false;    //新建的Worker是否启动标识
40     boolean workerAdded = false;    //新建的Worker是否被添加到workers标识
41     Worker w = null;
42     try {
43         final ReentrantLock mainLock = this.mainLock;
44         w = new Worker(firstTask);    //新建Worker
45         final Thread t = w.thread;
46         //什么情况下线程会为null呢?在ThreadFactory创建线程失败时可能会出现。
47         if (t != null) {
48             mainLock.lock();    //获取mainLock锁。对workers(HashSet非线程安全)和largestPoolSize更新必须加锁
49             try {
50                 // Recheck while holding lock.
51                 // Back out on ThreadFactory failure or if
52                 // shut down before lock acquired.
53                 int c = ctl.get();
54                 int rs = runStateOf(c);
55                 /*
56                  *    如果运行状态是RUNNING,或者运行状态是SHUTDOWN且firstTask为null,才将新建的Worker添加到workers
57                  */
58                 if (rs < SHUTDOWN ||
59                     (rs == SHUTDOWN && firstTask == null)) {
60                     if (t.isAlive()) // precheck that t is startable
61                         throw new IllegalThreadStateException();
62                     workers.add(w);
63                     //更新largestPoolSize,标识线程池曾经出现过的最大线程数
64                     int s = workers.size();
65                     if (s > largestPoolSize)
66                         largestPoolSize = s;
67                     workerAdded = true;
68                 }
69             } finally {
70                 mainLock.unlock();    //释放mainLock锁
71             }
72             if (workerAdded) {
73                 //启动线程
74                 t.start();
75                 workerStarted = true;
76             }
77         }
78     } finally {
79         //新建的Worker未启动,进行失败处理
80         if (! workerStarted)
81             addWorkerFailed(w);
82     }
83     return workerStarted;
84 }

    Worker类

      每个线程被封装为一个Worker类实例。Worker类继承了AbstractQueuedSynchronizer,并实现了一个互斥非重入锁。Worker类同时继承了Runnable,Worker类的实例也是一个线程。

 1 private final class Worker
 2     extends AbstractQueuedSynchronizer
 3     implements Runnable
 4 {
 5     /**
 6      * This class will never be serialized, but we provide a
 7      * serialVersionUID to suppress a javac warning.
 8      */
 9     private static final long serialVersionUID = 6138294804551838833L;
10 
11     /** Thread this worker is running in.  Null if factory fails. */
12     final Thread thread;    //处理任务的线程
13     /** Initial task to run.  Possibly null. */
14     Runnable firstTask;        //传入的任务
15     /** Per-thread task counter */
16     volatile long completedTasks;    //完成的任务数
17 
18     /**
19      * Creates with given first task and thread from ThreadFactory.
20      * @param firstTask the first task (null if none)
21      */
22     Worker(Runnable firstTask) {
23         //同步状态初始化为-1,在执行runWorker方法前禁止中断当前线程
24         setState(-1); // inhibit interrupts until runWorker 
25         this.firstTask = firstTask;
26         this.thread = getThreadFactory().newThread(this);    //通过ThreadFactory创建线程
27     }
28 
29     /** Delegates main run loop to outer runWorker  */
30     public void run() {
31         runWorker(this);
32     }
33 
34     // Lock methods
35     //
36     // The value 0 represents the unlocked state.
37     // The value 1 represents the locked state.
38     //实现了一个非重入互斥锁,state=0表示解锁状态,state=1表示加锁状态
39     protected boolean isHeldExclusively() {
40         return getState() != 0;
41     }
42 
43     protected boolean tryAcquire(int unused) {
44         if (compareAndSetState(0, 1)) {
45             setExclusiveOwnerThread(Thread.currentThread());
46             return true;
47         }
48         return false;
49     }
50 
51     protected boolean tryRelease(int unused) {
52         setExclusiveOwnerThread(null);
53         setState(0);
54         return true;
55     }
56 
57     public void lock()        { acquire(1); }
58     public boolean tryLock()  { return tryAcquire(1); }
59     public void unlock()      { release(1); }
60     public boolean isLocked() { return isHeldExclusively(); }
61 
62     void interruptIfStarted() {
63         Thread t;
64         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
65             try {
66                 t.interrupt();
67             } catch (SecurityException ignore) {
68             }
69         }
70     }
71 }

    runWorker方法

 1 final void runWorker(Worker w) {
 2     Thread wt = Thread.currentThread();
 3     Runnable task = w.firstTask;
 4     w.firstTask = null;
 5     //Worker初始化时同步状态置为-1,此处进行解锁操作目的是将同步状态置为0,允许中断。
 6     w.unlock(); // allow interrupts
 7     boolean completedAbruptly = true;    //是否因为异常跳出循环
 8     try {
 9         //如果firstTask为null则通过getTask()方法从队列中获取。
10         //正常情况下,会一直执行While循环,如果队列为空,getTask()方法中会阻塞当前线程,getTask()返回null时会跳出循环
11         while (task != null || (task = getTask()) != null) {
12             w.lock();    //加Worker锁
13             // If pool is stopping, ensure thread is interrupted;
14             // if not, ensure thread is not interrupted.  This
15             // requires a recheck in second case to deal with
16             // shutdownNow race while clearing interrupt
17             /*
18              * 如果线程池正在停止,要保证当前线程是中断状态
19              * 如果不是,则要保证当前线程不是中断状态
20              *  STOP状态要中断线程池中的所有线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,因为Thread.interrupted()方法会复位中断的状态。
21              */
22             if ((runStateAtLeast(ctl.get(), STOP) ||
23                  (Thread.interrupted() &&
24                   runStateAtLeast(ctl.get(), STOP))) &&
25                 !wt.isInterrupted())
26                 wt.interrupt();
27             tryJava并发编程之美

『死磕Java并发编程系列』并发编程工具类之CountDownLatch

Java并发编程:Synchronized及其实现原理

Java并发指南开篇:Java并发编程学习大纲

Java并发编程:Synchronized及其实现原理

『死磕Java并发编程系列』并发编程工具类之CountDownLatch