深入理解ThreadPoolExecutor第一弹
Posted 风在哪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解ThreadPoolExecutor第一弹相关的知识,希望对你有一定的参考价值。
从源头解析ThreadPoolExecutor第一弹—Executor&ExecutorService&AbstractExecutorService
首先我们来看看Thread PoolExecutor的继承关系:
从上图中可以看出,ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口,ExecutorService接口继承自Executor。
那么我们今天先来看看Executor和ExecutorService这两个接口为我们定义了哪些方法,以及这些方法的作用。
Executor接口
首先来看看Executor接口,这个接口比较简单,它只包含一个方法execute,这个方法就是在未来的某个时刻执行给定的任务。
来看看这个类包含的注释吧:
/*
这个接口的实现类是执行提交的Runnable任务的对象。
这个接口提供了策略和机制相分离的方法,将任务的提交以及如何运行每个任务相分离,包括了线程的使用和调度细节等。
我们通常使用Executor而不是直接创建线程
例如,对于任务集合中的每个任务,我们通常不是调用
new Thread(new (RunnableTask())).start()方法,而可能会这样使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
但是,Executor接口不强制要求执行是异步的。在一些简单的情况下,执行者可以在调用者的线程中立即运行提交的任务:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
更典型的是,任务是在调用方线程以外的线程中执行的。下面的执行器为每个任务生成一个新线程:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
许多执行器实现对任务调度的方式和时间施加了某种限制
下面的executor将任务提交序列化到第二个executor,说明了一个复合executor
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
该接口的子接口ExecutorService提供了更具扩展性的接口
ThreadPoolExecutor类提供了具有扩展性的线程池实现
Executors类提供了为我们创建执行器的工厂方法
在阿里巴巴开发手册中明确指出不允许使用Executors来创建线程池,
因为Executors可能会导致资源耗尽,引起OOM的错误,这个后续再分析
*/
public interface Executor {
/*
执行给定的命令。
这个命令可能由一个新的线程执行,或者由线程池中的线程执行,或者由调用它的线程执行,具体由其实现类决定。
*/
void execute(Runnable command);
}
通过上述源码及注释我们可以发现,这个接口定义了执行任务的顶层抽象接口,其实现类决定具体如何执行任务。
ExecutorService接口
相比于Executor来说,ExecutorService接口提供了更多可扩展的方法,先来一览有哪些方法:
继续来看注释:
/*
ExecutorService接口提供了管理线程终止的方法
并且提供了用于追踪一个或多个异步任务进度的Future接口
ExecutorService可以被关闭,这回导致它拒绝新的任务,它提供了两种不同的关闭方法。
shutdown方法允许关闭之前执行之前提交的任务
shutdownow方法会禁止等待的任务执行并且尝试停止正在执行的任务
ExecutorService终止时,执行器没有执行的任务、没有等待执行的任务、没有新提交的任务
应关闭未使用的ExecutorService以允许回收其资源
submit方法基于Executor.execute(Runnable)方法创建并且返回一个Future实例,Future能用于取消执行或者等待任务执行完成
invokeAny和invokeAll方法最常用的情况就是:
执行一组任务,然后等待至少一个或者全部任务完成
内存一致性影响:在将可运行或可调用的任务提交给ExecutorService之前,线程中的操作发生在该任务执行的任何操作之前,该操作的结果通过Future.get()方法获取
*/
public interface ExecutorService extends Executor {
/*
开始进行有序关闭,在此过程中执行以前提交的任务,但是不再接收新的任务
如果已经关闭,调用该方法将没有额外的影响
*/
void shutdown();
/*
尝试停止所有正在执行的任务,停止处理正在等待的任务
并返回等待执行的任务列表
该方法不等待活跃的任务终止
除了尽最大努力停止处理正在执行的任务之外,没有其他保证
例如,通常的实现是通过Thread.interrupt方法终止线程,所以那些无法响应中断的任务可能不会终止
*/
List<Runnable> shutdownNow();
/*
判断执行器有没有被关闭
如果已经关闭了返回true,否则返回false
*/
boolean isShutdown();
/*
如果关闭后所有的任务都完成,那就返回true
注意,除非调用shutdown或者shutdownnow,否则isTerminated不会返回true
*/
boolean isTerminated();
/*
在收到关闭请求、超时或当前线程中断以后,
会阻塞到完成全部执行的任务
如果执行器正常终止则返回true,如果超时未终止返回false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/*
提交一个返回值的任务去执行,并且返回代表执行结果的Future
当任务成功执行后,Future的get方法将会返回任务的执行结果
如果你想立刻阻塞等待任务的执行,可以使用如下形式:
result = exec.submit(aCallable).get();
*/
<T> Future<T> submit(Callable<T> task);
/*
提交Runnable任务去执行,返回代表任务的Future
当任务成功执行后,Future的get方法将会返回任务的执行结果
result是返回结果的类型
*/
<T> Future<T> submit(Runnable task, T result);
/*
提交任务返回Future对象
*/
Future<?> submit(Runnable task);
/*
执行给定的所有任务,当所有任务完成以后返回持有这些任务的状态和结果的Future对象列表。
注意,完成的任务可能是正常终止或者通过引发异常终止
如果在执行此操作时,修改了给定集合,那么此方法的结果是未定义的
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/*
执行所有的任务,当任务都执行完成以后或者达到了超时时间,就返回Future列表
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/*
执行给定的所有任务,如果任何一个任务执行完毕,返回这个成功执行的任务的结果(注意,不能是因为异常终止的任务)
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/*
如果任何任务在给定超时时间内执行完成,就返回该成功执行的任务的结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看出,该接口的方法作用如下:
- 终止线程池的运行,并判断其运行的状态,是否在关闭,是否终止成功
- 提交任务,向线程池提交我们的任务,并获得Future对象,等任务执行完成以后获得结果
- 执行所有的任务,返回对应的Future列表
- 执行所有任务,返回第一个执行成功的任务的结果
AbstractExecutorService
AbstractExecutorService提供了ExecutorService执行方法的默认实现。
通过newTaskFor方法返回的RunnableFuture对象,AbstractExecutorService实现了submit、invokeAny、invokeAll方法。
例如,submit(Runnable)方法通过Runnable构造了RunnableFuture对象用于执行和返回。
其子类可能会覆盖newTaskFor方法用以返回RunnableFuture,而不是FutureTask。
RunnableFuture
首先我们来看看RunnableFuture接口。
RunnableFuture接口继承了Runnable接口和Future接口,其中Runnable接口是我们实现线程的几种方式之一,这里就不再介绍。
首先来看看Future接口
Future
Future的实现类代表了异步任务的结果,它提供的方法是为了判断任务是否执行完成,等待任务执行完成,或者获得任务的结果。
仅当任务完成时可以使用get方法获得任务的结果,否则调用get将会阻塞直到任务完成。
如果想取消任务可以调用cancel方法。
其他的方法是为了判断任务是否执行完成或者已经取消。任务一旦完成就不能取消。如果为了可取消性而使用Future,但是没有提供有用的结果,可以声明为Future<?>类型,并将任务的返回结果置为null。
详细的方法介绍:
public interface Future<V> {
/*
尝试取消执行的任务,如果任务已经执行完成或者由于某种原因无法取消将会失败。
如果成功取消,而且cancel调用时任务还没有启动,任务将永远不会再运行。
如果此时任务已经启动,那么就由mayInterruptIfRunning参数的值决定是否通过中断执行任务的线程来尝试停止任务
该方法返回后,后续调用isDone方法总是会返回true
如果cancel方法调用后返回true,那么isCanaelled方法也会返回true
当mayInterruptIfRunning为true时,正在运行任务的线程运行被中断,否则就允许该任务执行完成
*/
boolean cancel(boolean mayInterruptIfRunning);
/*
如果任务成功取消就返回true
*/
boolean isCancelled();
/*
如果任务完成或者取消都回返回true
正常结束、异常或者取消这些情况都算完成
*/
boolean isDone();
/*
等待任务执行结束,获取任务执行的结果
*/
V get() throws InterruptedException, ExecutionException;
/*
在给定的时间内等待任务执行的结束
到达规定时间后,无论任务是否结束都会结束get方法
如果此时结果可用,就获取任务的结果
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future的主要功能就是获取它对应任务的执行结果,或者取消任务,判断任务的状态等。
重回RunnableFuture
RunnableFuture只定义了一个方法,也就是run()方法,run()方法的成功执行意味着Future执行完成,并且允许通过Future获得结果。
RunnableFuture继承了Future和Runnable接口,表示它具有这两个接口的特性,可以被线程运行,也可以获得对应的结果等。
AbstractExecutorService
首先来看看submit相关的方法:
/*
根据给定的Runnable实例和默认值构造RunnableFuture实例
其中value为future返回的默认值
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/*
根据给定的callable实例构造RunnableFuture实例
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/*
提交任务,调用子类实现的execute()方法运行这个任务
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/*
提交任务,调用子类实现的execute()方法运行这个任务
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/*
提交任务,调用子类实现的execute()方法运行这个任务
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
doInvokeAny()方法
AbstractExecutorService还实现了invokeAny和invokeAll方法,invokeAny方法主要调用了doInvokeAny方法进行实现,所以这里主要讲解doInvokeAny方法:
/*
执行所有的任务,返回第一个执行完成的任务的结果
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 如果没有任务需要执行就抛出异常
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
// 根据任务的数量创建存储Future的ArrayList
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
// 通过AbstractExecutorService自身构造ExecutorCompletionService对象
// ExecutorCompletionService通过给定的Executor执行任务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
// 获取任务执行的截止时间,如果为0L的话说明没有时间限制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 获得任务的迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
// 执行第一个任务,并将其返回的Future对象放入ArrayList中
// 从这里可以看出是顺序执行tasks中的任务
futures.add(ecs.submit(it.next()));
// 减少要执行的任务的数量
--ntasks;
// 正在运行的任务
int active = 1;
// 自旋执行任务
for (;;) {
// 获取并移除下一个将要完成的任务
Future<T> f = ecs.poll();
// 如果没有将要完成的任务
if (f == null) {
// 如果还有时间,那就继续提交新任务
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 如果正在执行的任务数为0,就跳出此次循环
else if (active == 0)
break;
// 如果有时间要求,就获取在规定时间内完成的任务
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
// 如果没有在规定时间内完成的任务,就抛出异常
if (f == null)
throw new TimeoutException();
// 减少等待的时间
nanos = deadline - System.nanoTime();
}
else
// 如果上述条件都不满足,就调用take方法,阻塞到任务执行完成返回future对象
f = ecs.take();
}
// 如果future不为null,那就调用get方法返回结果
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 取消执行其他已经提交的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
通过上述源码的分析,我们可以发现,doInvokeAny方法会自旋的执行任务,直到获取到第一个成功执行的任务才会将任务的结果返回。结果成功返回前,doInvokeAny在自旋期间还会继续提交任务,所以最后还会取消执行之前提交的任务。
invokeAny方法会调用doInvokeAny方法,所以这里就不再一一介绍。
invokeAll()方法
// 执行所有的任务,并返回这些任务的执行结果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
// 如果任务为null就抛出异常
if (tasks == null)
throw new NullPointerException();
// 根据任务的数量创建存储Future对象的List
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
// 标识是否需要取消执行的任务
boolean done = false;
try {
// 遍历给定的任务,通过newTaskFor将其包装成RunnableFuture对象,并加入到存储Future对象的列表中
// 同时执行这些任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 执行任务
execute(f);
}
// 遍历所有任务对应的Future对象
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
// 如果任务没有完成,调用其get方法
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// done为ture时表示不需要取消正在执行的任务
done = true;
return futures;
} finally {
// 如果done为false
if (!done)
// 取消正在执行的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
// 返回在规定时间内成功提交的任务关联的Future对象
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean以上是关于深入理解ThreadPoolExecutor第一弹的主要内容,如果未能解决你的问题,请参考以下文章