深入理解ThreadPoolExecutor第一弹

Posted 风在哪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解ThreadPoolExecutor第一弹相关的知识,希望对你有一定的参考价值。

从源头解析ThreadPoolExecutor第一弹—Executor&ExecutorService&AbstractExecutorService

首先我们来看看Thread PoolExecutor的继承关系:

image-20210417141612799

从上图中可以看出,ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口,ExecutorService接口继承自Executor。

那么我们今天先来看看Executor和ExecutorService这两个接口为我们定义了哪些方法,以及这些方法的作用。

Executor接口

首先来看看Executor接口,这个接口比较简单,它只包含一个方法execute,这个方法就是在未来的某个时刻执行给定的任务。

来看看这个类包含的注释吧:

/*
这个接口的实现类是执行提交的Runnable任务的对象。
这个接口提供了策略和机制相分离的方法,将任务的提交以及如何运行每个任务相分离,包括了线程的使用和调度细节等。
我们通常使用Executor而不是直接创建线程
例如,对于任务集合中的每个任务,我们通常不是调用
new Thread(new (RunnableTask())).start()方法,而可能会这样使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

但是,Executor接口不强制要求执行是异步的。在一些简单的情况下,执行者可以在调用者的线程中立即运行提交的任务:
class DirectExecutor implements Executor {
   public void execute(Runnable r) {
     r.run();
   }
 }
更典型的是,任务是在调用方线程以外的线程中执行的。下面的执行器为每个任务生成一个新线程:
class ThreadPerTaskExecutor implements Executor {
   public void execute(Runnable r) {
     new Thread(r).start();
   }
 }
 
许多执行器实现对任务调度的方式和时间施加了某种限制
下面的executor将任务提交序列化到第二个executor,说明了一个复合executor
class SerialExecutor implements Executor {
   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
   final Executor executor;
   Runnable active;

   SerialExecutor(Executor executor) {
     this.executor = executor;
   }

   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }

   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }
该接口的子接口ExecutorService提供了更具扩展性的接口
ThreadPoolExecutor类提供了具有扩展性的线程池实现
Executors类提供了为我们创建执行器的工厂方法
	在阿里巴巴开发手册中明确指出不允许使用Executors来创建线程池,
因为Executors可能会导致资源耗尽,引起OOM的错误,这个后续再分析
*/
public interface Executor {
    /*
    执行给定的命令。
    这个命令可能由一个新的线程执行,或者由线程池中的线程执行,或者由调用它的线程执行,具体由其实现类决定。
    */
    void execute(Runnable command);
}

通过上述源码及注释我们可以发现,这个接口定义了执行任务的顶层抽象接口,其实现类决定具体如何执行任务。

ExecutorService接口

相比于Executor来说,ExecutorService接口提供了更多可扩展的方法,先来一览有哪些方法:

image-20210417145833485

继续来看注释:

/*
ExecutorService接口提供了管理线程终止的方法
并且提供了用于追踪一个或多个异步任务进度的Future接口
ExecutorService可以被关闭,这回导致它拒绝新的任务,它提供了两种不同的关闭方法。
shutdown方法允许关闭之前执行之前提交的任务
shutdownow方法会禁止等待的任务执行并且尝试停止正在执行的任务
ExecutorService终止时,执行器没有执行的任务、没有等待执行的任务、没有新提交的任务
应关闭未使用的ExecutorService以允许回收其资源

submit方法基于Executor.execute(Runnable)方法创建并且返回一个Future实例,Future能用于取消执行或者等待任务执行完成
invokeAny和invokeAll方法最常用的情况就是:
执行一组任务,然后等待至少一个或者全部任务完成

内存一致性影响:在将可运行或可调用的任务提交给ExecutorService之前,线程中的操作发生在该任务执行的任何操作之前,该操作的结果通过Future.get()方法获取
*/
public interface ExecutorService extends Executor {
    /*
    开始进行有序关闭,在此过程中执行以前提交的任务,但是不再接收新的任务
    如果已经关闭,调用该方法将没有额外的影响
    */
    void shutdown();
    /*
    尝试停止所有正在执行的任务,停止处理正在等待的任务
    并返回等待执行的任务列表
    该方法不等待活跃的任务终止
    除了尽最大努力停止处理正在执行的任务之外,没有其他保证
    例如,通常的实现是通过Thread.interrupt方法终止线程,所以那些无法响应中断的任务可能不会终止
    */
    List<Runnable> shutdownNow();
    /*
    判断执行器有没有被关闭
    如果已经关闭了返回true,否则返回false
    */
    boolean isShutdown();
    /*
    如果关闭后所有的任务都完成,那就返回true
    注意,除非调用shutdown或者shutdownnow,否则isTerminated不会返回true
    */
    boolean isTerminated();
    /*
    在收到关闭请求、超时或当前线程中断以后,
    会阻塞到完成全部执行的任务
    
    如果执行器正常终止则返回true,如果超时未终止返回false
    */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    /*
    提交一个返回值的任务去执行,并且返回代表执行结果的Future
    当任务成功执行后,Future的get方法将会返回任务的执行结果
    如果你想立刻阻塞等待任务的执行,可以使用如下形式:
    result = exec.submit(aCallable).get();
    */
    <T> Future<T> submit(Callable<T> task);
    /*
    提交Runnable任务去执行,返回代表任务的Future
    当任务成功执行后,Future的get方法将会返回任务的执行结果
    result是返回结果的类型
    */
    <T> Future<T> submit(Runnable task, T result);
    /*
    提交任务返回Future对象
    */
    Future<?> submit(Runnable task);
    /*
    执行给定的所有任务,当所有任务完成以后返回持有这些任务的状态和结果的Future对象列表。
    注意,完成的任务可能是正常终止或者通过引发异常终止
    如果在执行此操作时,修改了给定集合,那么此方法的结果是未定义的
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    /*
    执行所有的任务,当任务都执行完成以后或者达到了超时时间,就返回Future列表
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    /*
    执行给定的所有任务,如果任何一个任务执行完毕,返回这个成功执行的任务的结果(注意,不能是因为异常终止的任务)
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    /*
    如果任何任务在给定超时时间内执行完成,就返回该成功执行的任务的结果
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看出,该接口的方法作用如下:

  • 终止线程池的运行,并判断其运行的状态,是否在关闭,是否终止成功
  • 提交任务,向线程池提交我们的任务,并获得Future对象,等任务执行完成以后获得结果
  • 执行所有的任务,返回对应的Future列表
  • 执行所有任务,返回第一个执行成功的任务的结果

AbstractExecutorService

AbstractExecutorService提供了ExecutorService执行方法的默认实现。

通过newTaskFor方法返回的RunnableFuture对象,AbstractExecutorService实现了submit、invokeAny、invokeAll方法。

例如,submit(Runnable)方法通过Runnable构造了RunnableFuture对象用于执行和返回。

其子类可能会覆盖newTaskFor方法用以返回RunnableFuture,而不是FutureTask。

RunnableFuture

首先我们来看看RunnableFuture接口。

RunnableFuture接口继承了Runnable接口和Future接口,其中Runnable接口是我们实现线程的几种方式之一,这里就不再介绍。

首先来看看Future接口

Future

Future的实现类代表了异步任务的结果,它提供的方法是为了判断任务是否执行完成,等待任务执行完成,或者获得任务的结果。

仅当任务完成时可以使用get方法获得任务的结果,否则调用get将会阻塞直到任务完成。

如果想取消任务可以调用cancel方法。

其他的方法是为了判断任务是否执行完成或者已经取消。任务一旦完成就不能取消。如果为了可取消性而使用Future,但是没有提供有用的结果,可以声明为Future<?>类型,并将任务的返回结果置为null。

详细的方法介绍:

public interface Future<V> {
    /*
    尝试取消执行的任务,如果任务已经执行完成或者由于某种原因无法取消将会失败。
    如果成功取消,而且cancel调用时任务还没有启动,任务将永远不会再运行。
    如果此时任务已经启动,那么就由mayInterruptIfRunning参数的值决定是否通过中断执行任务的线程来尝试停止任务
    该方法返回后,后续调用isDone方法总是会返回true
    如果cancel方法调用后返回true,那么isCanaelled方法也会返回true
    
    当mayInterruptIfRunning为true时,正在运行任务的线程运行被中断,否则就允许该任务执行完成
    */
    boolean cancel(boolean mayInterruptIfRunning);
    /*
    如果任务成功取消就返回true
    */
    boolean isCancelled();
    /*
    如果任务完成或者取消都回返回true
    正常结束、异常或者取消这些情况都算完成
    */
    boolean isDone();
    /*
    等待任务执行结束,获取任务执行的结果
    */
    V get() throws InterruptedException, ExecutionException;
    /*
    在给定的时间内等待任务执行的结束
    
    到达规定时间后,无论任务是否结束都会结束get方法
    如果此时结果可用,就获取任务的结果
    */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future的主要功能就是获取它对应任务的执行结果,或者取消任务,判断任务的状态等。

重回RunnableFuture

RunnableFuture只定义了一个方法,也就是run()方法,run()方法的成功执行意味着Future执行完成,并且允许通过Future获得结果。

image-20210418193214341

RunnableFuture继承了Future和Runnable接口,表示它具有这两个接口的特性,可以被线程运行,也可以获得对应的结果等。

AbstractExecutorService

首先来看看submit相关的方法:

    /*
    根据给定的Runnable实例和默认值构造RunnableFuture实例
    其中value为future返回的默认值
    */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    /*
    根据给定的callable实例构造RunnableFuture实例
    */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    /*
    提交任务,调用子类实现的execute()方法运行这个任务
    */
	public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /*
    提交任务,调用子类实现的execute()方法运行这个任务
    */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /*
    提交任务,调用子类实现的execute()方法运行这个任务
    */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

doInvokeAny()方法

AbstractExecutorService还实现了invokeAny和invokeAll方法,invokeAny方法主要调用了doInvokeAny方法进行实现,所以这里主要讲解doInvokeAny方法:

	/*
	执行所有的任务,返回第一个执行完成的任务的结果
	*/
	private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        // 如果没有任务需要执行就抛出异常
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
       	// 根据任务的数量创建存储Future的ArrayList
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        // 通过AbstractExecutorService自身构造ExecutorCompletionService对象
        // ExecutorCompletionService通过给定的Executor执行任务
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            
            // 获取任务执行的截止时间,如果为0L的话说明没有时间限制
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            // 获得任务的迭代器
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            // 执行第一个任务,并将其返回的Future对象放入ArrayList中
            // 从这里可以看出是顺序执行tasks中的任务
            futures.add(ecs.submit(it.next()));
            // 减少要执行的任务的数量
            --ntasks;
            // 正在运行的任务
            int active = 1;
			// 自旋执行任务
            for (;;) {
                // 获取并移除下一个将要完成的任务
                Future<T> f = ecs.poll();
                // 如果没有将要完成的任务
                if (f == null) {
                    // 如果还有时间,那就继续提交新任务
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 如果正在执行的任务数为0,就跳出此次循环
                    else if (active == 0)
                        break;
                    // 如果有时间要求,就获取在规定时间内完成的任务
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        // 如果没有在规定时间内完成的任务,就抛出异常
                        if (f == null)
                            throw new TimeoutException();
                        // 减少等待的时间
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        // 如果上述条件都不满足,就调用take方法,阻塞到任务执行完成返回future对象
                        f = ecs.take();
                }
                // 如果future不为null,那就调用get方法返回结果
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            // 取消执行其他已经提交的任务
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

通过上述源码的分析,我们可以发现,doInvokeAny方法会自旋的执行任务,直到获取到第一个成功执行的任务才会将任务的结果返回。结果成功返回前,doInvokeAny在自旋期间还会继续提交任务,所以最后还会取消执行之前提交的任务。

invokeAny方法会调用doInvokeAny方法,所以这里就不再一一介绍。

invokeAll()方法

	// 执行所有的任务,并返回这些任务的执行结果
	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        // 如果任务为null就抛出异常
        if (tasks == null)
            throw new NullPointerException();
        // 根据任务的数量创建存储Future对象的List
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        // 标识是否需要取消执行的任务
        boolean done = false;
        try {
            // 遍历给定的任务,通过newTaskFor将其包装成RunnableFuture对象,并加入到存储Future对象的列表中
            // 同时执行这些任务
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                // 执行任务
                execute(f);
            }
            // 遍历所有任务对应的Future对象
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                // 如果任务没有完成,调用其get方法
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // done为ture时表示不需要取消正在执行的任务
            done = true;
            return futures;
        } finally {
            // 如果done为false
            if (!done)
                // 取消正在执行的任务
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }
	// 返回在规定时间内成功提交的任务关联的Future对象
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean以上是关于深入理解ThreadPoolExecutor第一弹的主要内容,如果未能解决你的问题,请参考以下文章

深入理解ThreadPoolExecutor第二弹

深入理解ThreadPoolExecutor第二弹

深入理解ThreadPoolExecutor第三弹

深入理解ThreadPoolExecutor第三弹

深入理解java线程池—ThreadPoolExecutor

深入理解Java线程池:ThreadPoolExecutor