浅析ThreadPoolExecutor

Posted 乔不思

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

ThreadPoolExecutor是jdk自带的线程池实现,其中他有四种常用的线程池模式通过ExecutorService获取:newSingleThreadExecutor,newSingleThreadExecutor,newCachedThreadPool,newFixedThreadPool,这四中是jdk自己经过对ThreadPoolExecutor的封装实现不同的线程池类型,今天我们来大概理解下ThreadPoolExecutor。

线程池的构建与接收任务

这块就大概阐述下,好多博客都在说这个。
线程初始化,通过int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
这些参数,其中
- corePoolSize 是线程池的核心线程数。
- maxPoolSize 是最大线程数
- keepAliveTime 线程池的空闲线程保持活跃的时间,前提是线程池的线程数大于corePoolSize,设置这个的目的是为了让jvm不来回的创建线程,已创建线程的任务已经提交归还给了线程池,那么这个线程不会里面走到terminal,而是会等待keepAliveTime的时间,看看是不是有新的任务进来,如果有新的任务,就直接拿取任务去执行,如果没有,则过KeepAliveTime的时间就自动消亡。
- unit 标识keepAliveTime的时间单位
- workQueue 当线程池的线程数已经大于coolPoolSize但是小于maxPoolSize的时候,就会先把线程先offer进阻塞队列,等到有空闲现成的时候就去阻塞队列中取。
- threadFactory 线程工厂,可不传,设定现成的创建方式 ,有默认的线程工厂
- handler 线程池的拒绝策略或者也叫抛弃策略,当线程数达到最大线程数或者线程池已经执行了shutDown或者shutDownNow方法到达shutdown,stop或者terminated状态的时候,就会执行这个策略,有四种:AbortPolicy 直接抛RejectedExecutionException;DiscardPolicy 直接丢弃该任务;DiscardOldestPolicy 丢弃等待队列中最老的任务;CallerRunsPolicy 创建一个新的线程执行该任务(当线程池没有执行shutdown()或者shutdownDown()方法的时候,也就是说处于运行中的状态)。

接受线程:
当一个任务到来时,首先会去判断当前线程池的poolSize是不是大于corePoolSize,如果小于,则直接创建新的线程执行;当poolSize达到corePoolSize,就会直接往workQueue中放,当workQueue存放满了,就会判断poolSize是不是小于maxPoolSize,如果小于,则创建新的线程,如果大于,则执行抛弃策略。

具体看ThreadPoolExecutor的execute方法jdk1.8

“`java
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
//先去判断poolSize和corePoolSize的大小,|| 是短路运算符,首次添加肯定会走后面的addIfUnderCorePoolSize方法
//这个方法就是党poolSize小于corePoolSize的时候直接创建线程执行任务
//当poolSize >=corePoolSize的时候就直接执行workQueue.offer(command) 这个方法,如果存放失败,则直接返回
//false,则执行addIfUnderMaximumPoolSize方法,这个会判断是不是poolSize是不是大于maxPoolSize,如果大于则
//返回false,不大于则创建线程执行任务返回true.
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
if (runState == RUNNING && workQueue.offer(command))
if (runState != RUNNING || poolSize == 0)
//保证线程池执行了shutDown或者shutDownNow,不接受新的任务。如果是shutDownNow则直接执行抛弃策略,如果是shutDown则让保证至少有一个线程,把队列中待执行的任务执行完。
ensureQueuedTaskHandled(command);

else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated


这里就不帖 具体的方法代码了,可以自己看下。

execute方法和submit方法

execute方法是ThreadPoolExecutor实现接口Executor的方法,而submit则是实现接口ExecutorService的方法,ExecutorService描述了线程池的生命周期 ExecutorService继承了Executor。有点绕。。 自己看源码

在ThreadPoolExecutor中是找不到submit方法的,发现是在其父抽象类AbstractExecutorService中实现的,见下面代码:
“`java

public Future<?> submit(Runnable task) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<Object> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;


public <T> Future<T> submit(Runnable task, T result) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;


public <T> Future<T> submit(Callable<T> task) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;

可以看见submit是支持Callable和Runnable参数的,并且有返回值。这个返回值刚好是一个RunnableFutrue,RunnableFutrue具有Runnable和Future的特性,也就是说能作为线程入参并且可以带返回值,返回值通过Futrue的get方法得到,代码中使用newTaskFor方法实现Callable,Runable到RunnableFutrue的转换,可以理解为newTaskFor方法就是一个适配器,将Runnable和Callable参数适配成为RunnableFutrue对象,转换完成后然后调用ThreadPoolExecutor的execute方法,则为上面execute方法,可见 execute没有返回值,并且接受的参数是Runnable对象。

shutDown和shutDownNow方法

ThreadPoolExecutor有四种状态:
-static final int RUNNING = 0;
-static final int SHUTDOWN = 1;
-static final int STOP = 2;
-static final int TERMINATED = 3;
转化关系:
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty

当线程池调用了shutDown方法,线程池不会里面停止,只是将线程状态转化为SHUTDOWN状态,并且尝试中断正在执行中的线程,线程中断调用的是Thread的interrupt方法(该方法只有在中断监听方法 wait,park,sleep等才能中断线程,其他时候不会中断线程,只是修改了现成的中断标识为true,而不会停止线程的执行) 然后会等到workQueue中的任务都被执行完才会到达TERMINATED状态。
具体代码:

“`java
public void shutdown()
/*
* Conceptually, shutdown is just a matter of changing the
* runState to SHUTDOWN, and then interrupting any worker
* threads that might be blocked in getTask() to wake them up
* so they can exit. Then, if there happen not to be any
* threads or tasks, we can directly terminate pool via
* tryTerminate. Else, the last worker to leave the building
* turns off the lights (in workerDone).
*
* But this is made more delicate because we must cooperate
* with the security manager (if present), which may implement
* policies that make more sense for operations on Threads
* than they do for ThreadPools. This requires 3 steps:
*
* 1. Making sure caller has permission to shut down threads
* in general (see shutdownPerm).
*
* 2. If (1) passes, making sure the caller is allowed to
* modify each of our threads. This might not be true even if
* first check passed, if the SecurityManager treats some
* threads specially. If this check passes, then we can try
* to set runState.
*
* 3. If both (1) and (2) pass, dealing with inconsistent
* security managers that allow checkAccess but then throw a
* SecurityException when interrupt() is invoked. In this
* third case, because we have already set runState, we can
* only try to back out from the shutdown as cleanly as
* possible. Some workers may have been killed but we remain
* in non-shutdown state (which may entail tryTerminate from
* workerDone starting a new worker to maintain liveness.)
*/

SecurityManager security = System.getSecurityManager();
if (security != null)
        security.checkPermission(shutdownPerm);

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        if (security != null)  // Check if caller can modify our threads
            for (Worker w : workers)
                security.checkAccess(w.thread);
        

        int state = runState;
        if (state < SHUTDOWN)
        //修改状态为SHUTDOWN
            runState = SHUTDOWN;

        try 
            for (Worker w : workers) 
            //尝试中断线程
                w.interruptIfIdle();
            
         catch (SecurityException se)  // Try to back out
            runState = state;
            // tryTerminate() here would be a no-op
            throw se;
        

//如果poolSize为0,但是workQueue中还有任务,则修改状态为RUNNING,开启一个线程执行workQueue中的任务,如果workQueue为空,则直接修改线程池状态为TERMINATED
tryTerminate(); // Terminate now if pool and queue empty
finally
mainLock.unlock();


可以看出shutDown没有返回值,并且是优雅停线程池
shutDownNow的方法:
“`java
public List shutdownNow()
/*
* shutdownNow differs from shutdown only in that
* 1. runState is set to STOP,
* 2. all worker threads are interrupted, not just the idle ones, and
* 3. the queue is drained and returned.
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try 
        if (security != null)  // Check if caller can modify our threads
            for (Worker w : workers)
                security.checkAccess(w.thread);
        

        int state = runState;
        if (state < STOP)
        //修改状态为STOP
            runState = STOP;

        try 
            for (Worker w : workers) 
            //尝试中断
                w.interruptNow();
            
         catch (SecurityException se)  // Try to back out
            runState = state;
            // tryTerminate() here would be a no-op
            throw se;
        
        //得到workQueue的所有任务
        List<Runnable> tasks = drainQueue();
        tryTerminate(); // Terminate now if pool and queue empty
        //但会任务队列
        return tasks;
     finally 
        mainLock.unlock();
    

则是直接修改线程状态为STOP,尝试中指所有线程,和shutDown一样
得到workQueue的所有任务,并且返回,如果状态大于RUNNING (SHUTDOWN,STOP,TERMIANTED)则线程池不在接收新的任务。显然shutDownNow不够优雅,谨慎选择。

tryTerminal方法:

“`java
private void tryTerminate()
if (poolSize == 0)
int state = runState;
//shutDown走这里会先去判断workQueue是否为空
if (state < STOP && !workQueue.isEmpty())
state = RUNNING; // disable termination check below
Thread t = addThread(null);
if (t != null)
t.start();

//shutDownNow直接走这里,直接TERMINATED
if (state == STOP || state == SHUTDOWN)
runState = TERMINATED;
termination.signalAll();
terminated();


ThreadPoolExecutor的Worker

Worker类可以看作是ThreadPoolExecuter线程池中受理任务的活跃线程。
“`java
private final class Worker implements Runnable
/**
* The runLock is acquired and released surrounding each task
* execution. It mainly protects against interrupts that are
* intended to cancel the worker thread from instead
* interrupting the task being run.
*/
private final ReentrantLock runLock = new ReentrantLock();

    /**
     * Initial task to run before entering run loop. Possibly null.
     */
    private Runnable firstTask;

    /**
     * Per thread completed task counter; accumulated
     * into completedTaskCount upon termination.
     */
    volatile long completedTasks;

    /**
     * Thread this worker is running in.  Acts as a final field,
     * but cannot be set until thread is created.
     */
    Thread thread;

    Worker(Runnable firstTask) 
        this.firstTask = firstTask;
    

    boolean isActive() 
        return runLock.isLocked();
    

    /**
     * Interrupts thread if not running a task.
     */
    void interruptIfIdle() 
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) 
            try 
        if (thread != Thread.currentThread())
        thread.interrupt();
             finally 
                runLock.unlock();
            
        
    

    /**
     * Interrupts thread even if running a task.
     */
    void interruptNow() 
        thread.interrupt();
    

    /**
     * Runs a single task between before/after methods.
     */
    private void runTask(Runnable task) 
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try 
            /*
             * Ensure that unless pool is stopping, this thread
             * does not have its interrupt set. This requires a
             * double-check of state in case the interrupt was
             * cleared concurrently with a shutdownNow -- if so,
             * the interrupt is re-enabled.
             */
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
                thread.interrupt();
            /*
             * Track execution state to ensure that afterExecute
             * is called only if task completed or threw
             * exception. Otherwise, the caught runtime exception
             * will have been thrown by afterExecute itself, in
             * which case we don't want to call it again.
             */
            boolean ran = false;
            //待实现方法,子类实现,
            beforeExecute(thread, task);
            try 
                task.run();
                ran = true;
                //待实现方法,子类实现,
                afterExecute(task, null);
                ++completedTasks;
             catch (RuntimeException ex) 
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            
         finally 
            runLock.unlock();
        
    

    /**
     * Main run loop
     */
    public void run() 
        try 
            Runnable task = firstTask;
            firstTask = null;
            //firstTask保存当前该线程受理的任务,如果没有任务则会执行getTask方法。
            //具体参看getTask方法,这块用的while循环,说明如果firstTask为空则一直会执行getTask方法,
            //直到getTask方法返回null
            while (task != null || (task = getTask()) != null) 
                runTask(task);
                task = null;
            
         finally 
        //pollSize--
            workerDone(this);
        
    

“`java
Runnable getTask()
for (;;)
try
int state = runState;
//返回null
if (state > SHUTDOWN)
return null;
Runnable r;
//执行了shutdown方法,则需要吧workQueue中的任务执行完
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
//设置了keepAliveTime或者workQueue中有可能有待处理的
//任务(poolSize > corePoolSize 表示如果有任务出来,会往workQueue中放,或者直接创建线程 –maxPoolSize>poolSize>corePoolSize)
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//如果keepAliaveTime的时间没拿到任务,r就为null,则该Worker就销毁了执行workerCanExit方法
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//直接阻塞等到workQueue的任务
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit())
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;

// Else retry
catch (InterruptedException ie)
// On interruption, re-check runState



worker的代码相对比较好理解。排版很乱,大概写一下!

线程池经常用,大概写一下,好多代码没贴出来,可以翻看jdk源码

以上是关于浅析ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

ThreadPoolExecutor八种拒绝策略浅析

ThreadPoolExecutor任务提交过程源码浅析

Java 线程池 ThreadPoolExecutor 八种拒绝策略浅析

高并发之——不得不说的线程池与ThreadPoolExecutor类浅析

ThreadPoolExecutor详解

CallableFutureFutureTask浅析