JUC系列Executor框架之线程池执行器

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之线程池执行器相关的知识,希望对你有一定的参考价值。

JUC系列之线程池执行器

文章目录

工厂类Executors

可以由工程类Executors创建不同的线程池执行器。

ThreadPoolExecutor

核心构造函数

当核心线程数量小于0时,或最大线程数量不大于0,或最大线程数小于核心线程数量,或当线程数超过corePoolSize时,线程空闲存活时长小于0时,将抛出IllegalArgumentException

当没有任务队列,或没有线程工厂,或没有拒绝策略时,将抛出NullPointerException

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    

参数说明

  • corePoolSize:线程池中的核心线程数量
  • maximumPoolSize:线程池中最大线程数量
  • keepAliveTime:当线程数超过corePoolSize时,线程空闲存活时长
  • unit:时间单位
  • workQueue:任务队列,被提交但是尚未执行的任务的存放处
  • threadFactory:线程工厂,用户创建线程,一般默认
  • handler:拒绝策略,任务过多或者其他原因导致线程池无法处理任务的拒绝策略。

默认线程工程方法

工厂的默认创建线程的方法

    static class DefaultThreadFactory implements ThreadFactory 
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        // 线程名的规则:pool-线程池数量-thread-线程数量
        DefaultThreadFactory() 
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        

        // 创建一个非守护型新线程,线程的优先级为一般
        public Thread newThread(Runnable r) 
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        
    

CachedThreadPool

Executors中有两个方法支持创建CachedThreadPool类型的线程池。

corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE。这代表线程数最大不受限。

keepAliveTime为60L,表示当线程空闲超过60s之后,就会终止销毁。

队列使用的是SynchronousQueue,是一个无容量的阻塞队列。它的内部同时只能够容纳单个元素。每一个put操作需要等待一个take操作,否则不能继续添加元素。当提交任务的速度高于线程池中处理任务的速度,CachedThreadPool会不停的创建新线程,这样会占用过多的CPU和内存资源导致服务器无法正常使用。

    public static ExecutorService newCachedThreadPool() 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    

与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory并实现newThread(Runnable r)来自定义创建线程的方法。

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    

CachedThreadPool的execute运行示意图

FixedThreadPool

Executors中有两个方法支持创建FixedThreadPool类型的线程池。

corePoolSize和maximumPoolSize均采用参数nThreads的值;keepAliveTime为0L,表示当线程空闲之后,就会立即终止销毁。

public static ExecutorService newFixedThreadPool(int nThreads) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory并实现newThread(Runnable r)来自定义创建线程的方法。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);

队列使用的是LinkedBlockingQueue,链表的默认和最大长度是 Integer.MAX_VALUE。使用无界队列作为工作队列会对线程池带来以下影响。

  1. 当线程池中的线程数达到corePoolSize后,新的任务将会在无界队列中等待。
  2. 由于corePoolSizemaximumPoolSize均采用参数nThreads的值,因此线程池中的线程数不会超过nThreads数量。
  3. 由于LinkedBlockingQueue是无界队列,运行中的FixedThreadPool(未执行方法shutdown()和shutdownNow())不会拒绝任务,不去调用

RejectedExecutionHandler.rejectedExecution

FixedThreadPool的execute运行示意图

SingleThreadExecutor

这是一个单个worker线程的Executor,Executors中有两个方法支持创建FixedThreadPool类型的线程池。

corePoolSize和maximumPoolSize均为1;keepAliveTime为0L,表示当线程空闲之后,就会立即终止销毁。

工作队列同样使用的是LinkedBlockingQueue。

    public static ExecutorService newSingleThreadExecutor() 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    

与上述方法不同的是,此方法提供了一个线程构造方法对象的参数,可以通过继承接口ThreadFactory并实现newThread(Runnable r)来自定义创建线程的方法。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) 
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));

SingleThreadExecutor的execute运行示意图

相关源码

成员变量

    //这个属性是用来存放 当前运行的worker数量以及线程池状态的
    //int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //存放任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    //worker的集合,用set来存放
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //历史达到的worker数最大值
    private int largestPoolSize;
    //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
    private volatile RejectedExecutionHandler handler;
    //超出coreSize的worker的生存时间
    private volatile long keepAliveTime;
    //常驻worker的数量
    private volatile int corePoolSize;
    //最大worker的数量,一般当workQueue满了才会用到这个参数
    private volatile int maximumPoolSize;
    // 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止
    // 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态)
    // 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock
    // 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行
    // 简单理解,termination.await() 会将线程阻塞在这里
    //         termination.signAll() 会将阻塞在这里的线程依次唤醒
	private final ReentrantLock mainLock = new ReentrantLock();
    // 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount
    private long completedTaskCount;
    // 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
    // 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
    // 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
    // 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
    private volatile ThreadFactory threadFactory;
    // 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以
    private volatile boolean allowCoreThreadTimeOut;
    // 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

内部状态

采用一个Integer的变量ctl,将其高3位表示线程运行状态,第29位表示线程池中的线程数量。

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	// 位数29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    /** 线程池容量 */
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; // 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
    private static final int STOP       =  1 << COUNT_BITS; // 该状态的线程池不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
    private static final int TIDYING    =  2 << COUNT_BITS; // 收到终止命令,进行线程池中任务整理状态;
    private static final int TERMINATED =  3 << COUNT_BITS; // 线程池中所有的任务都已经终止;

    // Packing and unpacking ctl
	/** 获取当前线程池运行状态*/
    private static int runStateOf(int c)      return c & ~CAPACITY; 
	/** 获取当前线程池线程数量*/
    private static int workerCountOf(int c)   return c & CAPACITY; 
	/** 用在重置当前线程池 ctl 值的时候会用到 
	* rs:代表线程池状态  wc:代表当前线程池 worker(线程)数量
	*
	*
	*/
    private static int ctlOf(int rs, int wc)  return rs | wc; 

    // 比较当前线程池ctl所表示的状态,是否小于某个状态s
    // c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true
    // 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
    private static boolean runStateLessThan(int c, int s) 
        return c < s;
    

	// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态
    private static boolean runStateAtLeast(int c, int s) 
        return c >= s;
    

	// 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0
    private static boolean isRunning(int c) 
        return c < SHUTDOWN;
    

    // 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false
    private boolean compareAndIncrementWorkerCount(int expect) 
        return ctl.compareAndSet(expect, expect + 1);
    

    // 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false
    private boolean compareAndDecrementWorkerCount(int expect) 
        return ctl.compareAndSet(expect, expect - 1);
    

    // 将ctl 的值减-1,这个方法一定成功
    private void decrementWorkerCount() 
        // 这里会一直进行重试,直到成功为止
        do  while (! compareAndDecrementWorkerCount(ctl.get()));
    
32 31 30 29 28 27 26 25 24 23 22 21 20 19 18 17 16 15 14 13 12 11 10 9 8 7 6 5 4 3 2 1  | 位的序号
--------------------------------------------------------------------------------------  | 
 1  1  1  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0 0 0 0 0 0 0 0 0 0  | RUNNING默认值
 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0 0 0 0 0 0 0 0 0 0  | SHUTDOWN默认值
 0  0  1  0  0  0  0  0	 0  0  0  0  0  0  0  0  0  0  0  0  0  0  0 0 0 0 0 0 0 0 0 0  | STOP默认值
 0  1  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0 0 0 0 0 0 0 0 0 0  | TIDYING默认值
 0  1  1  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0 0 0 0 0 0 0 0 0 0  | TERMINATED默认值
                                                                                        |
 0  0  0  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1 1 1 1 1 1 1 1 1 1  | CAPACITY 

内部类Worker

worker采用了AQS的独占模式。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    
        private static final long serialVersionUID = 6138294804551838833L;

        /** Worker 内部封装的工作线程 */
        final Thread thread;
        /** 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务。初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行。当新建一个线程时,会优先将当前的任务设为此线程的第一执行任务,之后才会考虑阻塞队列中的其他任务。*/
        Runnable firstTask;
        /** 记录当前所完成的任务数量 */
        volatile long completedTasks;

        /**
         * firstTask 可以为 null,为null启动后会到 queue中获取
         */
        Worker(Runnable firstTask) 
            // 设置 AQS 独占模式为初始化状态,不能被抢占锁
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口
            this.thread = getThreadFactory().newThread(this);
        

    	// 当worker 启动的时候,会执行run()方法
        public void run() 
            runWorker(this);
        

        // 判断当前worker的独占锁是否被独占
        // 0 表示未被占用
        // 1 表示已经占用
        protected boolean isHeldExclusively() 
            return getState() != 0;
        

        protected boolean tryAcquire(int unused) 
            if (compareAndSetState(0, 1)) 
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
            return false;
        

        protected boolean tryRelease(int unused) 
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        

        public void lock()         acquire(1); 
        public boolean tryLock()   return tryAcquire(1); 
        public以上是关于JUC系列Executor框架之线程池执行器的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程系列--“JUC线程池”01之 线程池架构

JUC系列Executor框架之FutureTask

JUC系列Executor框架之CompletionService

JUC—Executor线程池框架源码深度解析六万字

JUC系列Executor框架之CompletionFuture

Java并发编程系列之十五:Executor框架