java中的ExecutorCompletionService原理解析
Posted 乔不思
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java中的ExecutorCompletionService原理解析相关的知识,希望对你有一定的参考价值。
ExecutorCompletionService 是将 Executor和BlockQueue结合的jdk类,其实现的主要目的是:提交任务线程,每一个线程任务直线完成后,将返回值放在阻塞队列中,然后可以通过阻塞队列的take()方法返回 对应线程的执行结果!!
案例:
public class TestExecutorCompleteService
public static void main(String[] args) throws InterruptedException, ExecutionException
int num = 9;
ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(MyThreadPool.getExecutor());
for(int i=0;i<=num;i++)
Thread.sleep(100l);
executorCompletionService.submit(new Task(i));
for(int i=0;i<=num;i++)
System.out.println(executorCompletionService.take().get());
MyThreadPool.getExecutor().shutdown();
class MyThreadPool
private static class exe
private static ExecutorService executor = Executors.newCachedThreadPool();
private MyThreadPool()
public static ExecutorService getExecutor()
return exe.executor;
class Task implements Callable<String>
private volatile int i;
Task(int i)
this.i = i;
@Override
public String call() throws Exception
Thread.sleep(1000l);
return Thread.currentThread().getName() + "任务 :" +i;
执行结果:
源码分析:
类的定义,类头上已经清晰的解释了类的作用
* void solve(Executor e,
* Collection<Callable<Result>> solvers)
* throws InterruptedException
* CompletionService<Result> ecs
* = new ExecutorCompletionService<Result>(e);
* int n = solvers.size();
* List<Future<Result>> futures
* = new ArrayList<Future<Result>>(n);
* Result result = null;
* try
* for (Callable<Result> s : solvers)
* futures.add(ecs.submit(s));
* for (int i = 0; i < n; ++i)
* try
* Result r = ecs.take().get();
* if (r != null)
* result = r;
* break;
*
* catch (ExecutionException ignore)
*
*
* finally
* for (Future<Result> f : futures)
* f.cancel(true);
*
*
* if (result != null)
* use(result);
* </pre>
*/
public class ExecutorCompletionService<V> implements CompletionService<V>
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void>
QueueingFuture(RunnableFuture<V> task)
super(task, null);
this.task = task;
protected void done() completionQueue.add(task);
private final Future<V> task;
构造方法:构造executor和blockqueue
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* @link LinkedBlockingQueue as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is @code null
*/
public ExecutorCompletionService(Executor executor)
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* @code Queue.add operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are @code null
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue)
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
执行submit方法:这里可以看出 构造了一个QueueingFuture对象,感觉这个是整个机制的核心,看QueueingFuture的原码不难发现,他继承的是FurtrueTask对象,重写了done()方法(将excutor执行的future存放在queue中),FutureTask 继承了Futrue和Runnable接口。可以理解FutureTask就是AES的单线程版本,但是不具有提交线程的能力~~~
public Future<V> submit(Callable<V> task)
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
然后我们看看这个done方法是在哪里被调用执行的:
FutureTask接收的都是Callable对象,如果是Runnable对象也将其转换成Callable。Callable的执行底层还是执行走的Runnable
Executors类的一个内部类
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T>
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result)
this.task = task;
this.result = result;
public T call()
task.run();
return result;
FutureTask的run方法:
public void run()
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try
Callable<V> c = callable;
if (c != null && state == NEW)
V result;
boolean ran;
try
result = c.call();
ran = true;
catch (Throwable ex)
result = null;
ran = false;
setException(ex);
if (ran)
set(result);
finally
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
其中set(result)方法和setException方法调用了done方法,所以在线程执行完成后会间接调用 done方法,然后将线程执行返回的future对象放在blockqueue中
个人见解:
ExecutorCompletionService 这个类的设计还是有问题的,可能不是Doug Lea写的缘故,没有达到应有的抽象,感觉封装的很邋遢。
感觉对CompletionService的定义很模糊,既包含ExecuterService的submit方法又包含BlockQueue的对应放啊。感觉还能在外围对他进行抽象,让其继承ExecuterService和BlockQueue,作者也有可能是觉得这样实现这个接口代价太大的缘故。。但是太大也可以让抽象实现一部分方法,然后继承抽象哈哈~~
再说说ExecuterCompletionService吧,既包含了ExxecutorService有包含了AES,还包含了BlockQueue。。。更是觉得得抽象了。。。
能力有限,只能成为吐槽者,不能成为真正的实现者。。。
以上是关于java中的ExecutorCompletionService原理解析的主要内容,如果未能解决你的问题,请参考以下文章