Executor Framework分析 概述
Posted ZhangJianIsAStark
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Executor Framework分析 概述相关的知识,希望对你有一定的参考价值。
本篇博客主要记录一下,Java中Executor相关的并发框架涉及的内容。
Executor框架提供了一种标准的方法,将任务的提交过程和执行过程解耦。
Executor框架中比较主要的接口和类的关系如下所示:
Executor接口用于约定异步执行任务的接口。
该接口仅定义一个接受Runnable对象的函数,如下所示:
public interface Executor
void execute(Runnable command);
ExecutorService接口继承Executor,是一个功能更强大的接口。
我们列举一下其中比较有代表性的接口:
public interface ExecutorService extends Executor
//shutdown和shutdownNow均表示停止任务的执行
//以ThreadPoolExecutor为例,二者的区别是:
//shutdown只是将线程池的状态设置为SHUTWDOWN状态,
//正在执行的任务会继续执行下去(不保证能完整执行),没有被执行的则中断
//而shutdownNow则是将线程池的状态设置为STOP,正在执行的任务会被停止
//且返回未没被执行任务
//后面再分析ThreadPoolExecutor中相关的源码
void shutdown();
List<Runnable> shutdownNow();
........
//调用shutdown后,可以调用该接口
//该接口将会阻塞,直到所有已执行任务执行完毕
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//submit的功能与execute类似,除了参数有区别外,
//主要的差别是会返回一个Future对象
//通过Future对象,可以操控执行的task
<T> Future<T> submit(Callable<T> task);
................
//参数中所有Task均执行完毕后,才会返回
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
............
//参数中有一个Task执行完毕后,就会返回
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
......
AbstractExecutorService是一个抽象类,
提供了ExecutorService中部分接口的默认实现。
我们看看其中部分实现的代码:
public abstract class AbstractExecutorService implements ExecutorService
...............
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
//利用task构造出RunnableFuture, 实际的实现是FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
//实际还是调用execute接口
execute(ftask);
//返回FutureTask,通过ftask可以操控task
return ftask;
...........
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException
.......
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try
//提交所有任务
for (Callable<T> t : tasks)
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
for (int i = 0, size = futures.size(); i < size; i++)
Future<T> f = futures.get(i);
//判断是否执行完毕
if (!f.isDone())
//由于get会阻塞
//因此所有的Task均执行完,才会返回
try f.get();
catch (CancellationException ignore)
catch (ExecutionException ignore)
return futures;
catch (Throwable t)
//不过这里真是刚啊,只要异常一把
//就全部cancel
cancelAll(futures);
throw t;
................
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException
try
//这里false和0表示,不考虑超时
return doInvokeAny(tasks, false, 0);
catch (TimeoutException cannotHappen)
assert false;
return null;
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException
......
int ntasks = tasks.size();
......
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
//ExecutorCompletionService封装了Executor
//同时包含一个LinkedBlockingQueue, 记录执行完的Task
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
.............
try
............
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
//提交第一个task
futures.add(ecs.submit(it.next()));
//记录待执行的任务
--ntasks;
//记录正在执行的任务
int active = 1;
for (;;)
//取出执行完task对应的future
//若没有task执行完,则返回null
Future<T> f = ecs.poll();
if (f == null)
//没有执行完的,提交下一个任务
if (ntasks > 0)
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
//这种情况相当于都执行完了,但获取结果时异常
else if (active == 0)
break;
//进入这个分支,表明所有的task都已经提交了
//但还有task没执行完,此时有超时限制
else if (timed)
//等待一段时间
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
//更新等待时间
nanos = deadline - System.nanoTime();
else
//进入这个分支,表明所有的task都已经提交了
//但还有task没执行完, 且无超时限制
//利用take阻塞操作,坐等结果
f = ecs.take();
//表明有task执行完毕
if (f != null)
--active;
try
//获取结果,若有异常则继续for循环
//由此可见,有一个task执行完毕且能正常获取结果时,
//invokeAny就能返回结果
return f.get();
catch (ExecutionException eex)
ee = eex;
catch (RuntimeException rex)
ee = new ExecutionException(rex);
if (ee == null)
ee = new ExecutionException();
throw ee;
finally
//返回结果或异常后,最终cancel所有的task
cancelAll(futures);
......
ThreadPoolExecutor继承AbstractExecutorService,
作为线程池的实现类,实现了线程的创建、生命周期管理等方法。
ThreadPoolExecutor值得分析的方法比较多,后续博客再单独分析。
ForkJoinPool是JDK1.7之后引入的,也继承AbstractExecutorService,
同样是线程池的实现类。
正如ForkJoinPool的注释所述,它的优势在于:
若当前线程正在运行的任务,可以被拆分成多个子任务时(例如递归调用),
多个子任务也会被放到多个线程上并发执行。
等多个子任务执行完成之后,ForkJoinPool再将这些执行结果合并起来。
ForkJoinPool值得分析的方法比较多,后续博客再单独分析。
ScheduledExecutorService继承ExecutorService,主要在ExecutorService的基础上,
增加了延迟执行任务及周期性执行任务的能力。
我们列举一下其中比较有代表性的接口:
public interface ScheduledExecutorService extends ExecutorService
//延迟执行command
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
..............
//延迟initialDelay后,以period为间隔周期性地执行command
//直到future取消任务、线程池结束或出现异常
//如果某个command执行的时间超过period,后续的任务将延迟执行(不会并发执行)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//延迟initialDelay后,周期性执行command
//前一个command执行完毕后,等待delay后,执行下一个command
//直到future取消任务、线程池结束或出现异常
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ScheduledThreadPoolExecutor是实现ScheduledExecutorService接口的线程池,
即在ThreadPoolExecutor的基础上,扩展出ScheduledExecutorService指定的能力。
ScheduledThreadPoolExecutor扩展的方法,后续博客再单独分析。
Executors是一个工具类,简化了线程池、线程工厂的创建,
后续博客再单独分析。
至此,我们大致了解Executor Framework中主要类之间的关系,
及部分接口的实现方式。
接下来,我们用几篇博客,依次单独分析下ThreadPoolExecutor、
ScheduledThreadPoolExecutor、ForkJoinPool和Executors的实现。
以上是关于Executor Framework分析 概述的主要内容,如果未能解决你的问题,请参考以下文章
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ThreadPoolExecutor部分函数分析
Executor Framework分析 ThreadPoolExecutor部分函数分析