J.U.C之线程池-ThreadPoolExecutor

Posted chmaple

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了J.U.C之线程池-ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

ThreadPoolExecutor

JDK1.8中对与ThreadPoolExecutor是这么定义的:

/**
* An {@link ExecutorService} that executes each submitted task using
* one of possibly several pooled threads, normally configured
* using {@link Executors} factory methods.
*
* <p>Thread pools address two different problems: they usually
* provide improved performance when executing large numbers of
* asynchronous tasks, due to reduced per-task invocation overhead,
* and they provide a means of bounding and managing the resources,
* including threads, consumed when executing a collection of tasks.
* Each {@code ThreadPoolExecutor} also maintains some basic
* statistics, such as the number of completed tasks.
*/

ThreadPoolExecutor通常使用工厂方法(Executors)来配置执行实例,使用线程池中的线程来执行每一个提交的任务。ThreadPoolExecutor提供了两个主要功能:减少调用每个线程的开销,提高性能;提供了一系列方法来管理资源,监控执行。

接下来将会基于这两点和ThreadPoolExecutor的源码,对ThreadPoolExecutor进行解析:

  • 线程池的内部状态
  • ThreadPoolExecutor的构造方法和参数说明
  • 工厂方法的实现和线程池的类型
  • 线程池任务的提交、执行、中断和停止
  • 阻塞队列的选择和任务执行的策略
  • RejectedExecutionHandler任务的拒绝策略

线程池的内部状态

public class ThreadPoolExecutor extends AbstractExecutorService {
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;//高3位111 能够接收新任务,并对新添加的任务进行处理
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//高3位000 不接受新的任务,但是可以对已经添加的任务进行处理
    private static final int STOP       =  1 << COUNT_BITS;//高3位001 不接收新的任务,不处理已经添加的任务,并且会中断正在处理的任务
    private static final int TIDYING    =  2 << COUNT_BITS;//高3位010 当前所有的任务已经终止,然后会执行钩子函数terminated()
    private static final int TERMINATED =  3 << COUNT_BITS;//高3位011 线程池彻底中止

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
}

变量ctl定义为AtomicInteger类型,共32位大小,记录了“线程池中的任务数量”和“线程池的状态”两个信息,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量。

构造方法

ThreadPoolExecutor提供了四个重载的构造函数,先看最全的一个:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这个构造函数一共有7个参数,每个参数的含义如下:

  • corePoolSize

线程池中核心线程的数量。在提交任务时如果线程池中核心线程的数量小于corePoolSize,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。

如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

  • maximumPoolSize

线程池中允许的最大线程池数。在提交任务如果线程池的阻塞队列已满且当前线程池中核心线程的数量小于maximumPoolSize,则会新建一个线程来执行任务。

另外,如果线程池所用的BlockingQueue是无界的LinkedBlockingQueue,那么该参数将没有实际意义。

  • keepAliveTime

线程池所允许的空闲时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间,这段时间由keepAliveTime和unit一起决定。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。

  • unit(TimeUnit)
  • workQueue(BlockingQueue<Runable>)

用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。BlockingQueue有七种实现和一种内部实现类,具体分析可以参考 https://www.cnblogs.com/CHMaple/p/9284583.html

  • threadFactory

用于设置创建线程的工厂方法,默认使用Executors.defaultThreadFactory()方法,该方法返回了一个DefaultThreadFactory的实例。通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

Executors工厂方法也提供了另一个线程工厂方法privilegedThreadFactory,该方法返回了一个PrivilegedThreadFactory的实例,该方法创建的新线程和当前线程具有相同的权限。

   static class PrivilegedThreadFactory extends DefaultThreadFactory {
        private final AccessControlContext acc;
        private final ClassLoader ccl;

        PrivilegedThreadFactory() {
            super();
            SecurityManager sm = System.getSecurityManager();
            if (sm != null) {
                // Calls to getContextClassLoader from this class
                // never trigger a security check, but we check
                // whether our callers have this permission anyways.
                sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

                // Fail fast
                sm.checkPermission(new RuntimePermission("setContextClassLoader"));
            }
            this.acc = AccessController.getContext();
            this.ccl = Thread.currentThread().getContextClassLoader();
        }

        public Thread newThread(final Runnable r) {
            return super.newThread(new Runnable() {
                public void run() {
                    AccessController.doPrivileged(new PrivilegedAction<Void>() {
                        public Void run() {
                            Thread.currentThread().setContextClassLoader(ccl);
                            r.run();
                            return null;
                        }
                    }, acc);
                }
            });
        }
    }

 

  • handler(RejectedExecutionHandler)

线程池的拒绝执行策略,是指将任务添加到线程池而线程池拒绝该任务时所要采取的执行策略。如果线程池的线程已经达到最大数量且阻塞队列也满了,那么线程池会根据构造函数的拒绝策略参数执行拒绝操作。

线程池默认提供了四种拒绝策略:AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy。

本文最后将会依次介绍每个策略的执行方式。

工厂方法的实现

工厂类Executors提供了三种默认的线程池:FixedThreadPool、SingleThreadPool、CachedThreadPool。

  • FixedThreadPool
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 

  • SingleThreadPool
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。

  • CachedThreadPool
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool的核心线程数是0,最大线程数是Integer的最大值,同时BlockingQueue使用的是SynchronousQueue,这个阻塞队列的特点是没有容量,每一个put操作必须要等待一个take操作,否则不能添加元素。

所以当新的任务到来的时候,CachedThreadPool会重用一个空闲的线程执行任务或者创建一个新的线程来执行任务。keepAliveTime和unit的值分别是60和秒,意味着空闲的进程超过60秒就会被终止。

CachedThreadPool的优点是在线程资源和系统资源充足的情况下会尽可能快的完成任务,但是缺点也很明显,当线程的处理速度跟不上主线程提交任务的速度的时候,CachedThreadPool会不断创建新的线程来执行任务,最终可能导致系统耗尽CPU和内存资源。

所以使用CachedThreadPool的时候一定要注意控制并发的任务数,否则高并发的情况下可能会导致严重的性能问题。

 













以上是关于J.U.C之线程池-ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

那些有趣的代码 —— 有点萌的 Tomcat 的线程池

并发编程-并发容器(J.U.C)核心 AbstractQueuedSynchronizer 抽象队列同步器AQS介绍

J.U.C之AQS:阻塞和唤醒线程

并发编程-线程安全策略之并发容器(J.U.C)中的集合类

j.u.c系列(09)---之并发工具类:CyclicBarrier

ThreadPoolExecutor线程池问题,为啥没有实现Runnable接口,execute入参可以这样写?如下