JUC系列Executor框架之CompletionService
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之CompletionService相关的知识,希望对你有一定的参考价值。
【JUC系列】Executor框架之CompletionService
文章目录
CompletionService
这是一个接口,提供一种服务,它将新的异步任务的生产与已完成任务的结果的消费分离。生产者提交任务以供执行。消费者接受已完成的任务并按照他们完成的顺序处理他们的结果。
例如,CompletionService 可用于管理异步 I/O,其中执行读取的任务在程序或系统的一个部分中提交,然后在读取完成时在程序的不同部分执行操作,可能在与他们要求的顺序不同。通常,CompletionService 依赖一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。 ExecutorCompletionService 类提供了这种方法的实现。
内存一致性效果:在将任务提交到 CompletionService 之前线程中的操作发生在该任务所采取的操作之前,这反过来又发生在从相应的 take() 成功返回之后的操作。
接口提供了以下方法
方法名 | 描述 |
---|---|
Future submit(Callable task) | 提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 完成后,可以采取或轮询此任务。 |
Future submit(Runnable task, V result) | 提交 Runnable 任务以执行并返回代表该任务的 Future。 完成后,可以take或poll此任务。 |
Future take() throws InterruptedException | 检索并删除代表下一个已完成任务的 Future,如果还没有,则等待。 |
Future poll() | 检索并删除表示下一个已完成任务的 Future,如果不存在,则返回 null。 |
Future poll(long timeout, TimeUnit unit) throws InterruptedException | 检索并删除代表下一个已完成任务的 Future,如果还没有,则在必要时等待指定的等待时间。 |
ExecutorCompletionService
ExecutorCompletionService实现了CompletionService。
一个 CompletionService,它使用提供的 Executor 来执行任务。 此类安排提交的任务在完成后放置在使用 take 可访问的队列中。 该类足够轻量级,适合在处理任务组时临时使用。
成员变量
// 执行任务的线程池
private final Executor executor;
// ???
private final AbstractExecutorService aes;
// 任务完成会存放在该阻塞队列中
private final BlockingQueue<Future<V>> completionQueue;
内部类QueueingFuture
继承了FutureTask
private class QueueingFuture extends FutureTask<Void>
QueueingFuture(RunnableFuture<V> task)
super(task, null);
this.task = task;
// 执行成功,将任务添加到completionQueue队列中。主要在FutureTask执行finishCompletion()时,调用done。
protected void done() completionQueue.add(task);
private final Future<V> task;
构造函数
参数executor:传入的线程池,用来执行任务;默认的队列是LinkedBlockingQueue。
public ExecutorCompletionService(Executor executor)
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
参数executor:传入的线程池,用来执行任务;
参数completionQueue:用来记录执行结果的阻塞列表
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue)
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
任务执行
public Future<V> submit(Callable<V> task)
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
public Future<V> submit(Runnable task, V result)
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
ExecutorCompletionService的newTaskFor,将Callable或Runnable任务转成FutureTask。
若线程池不为AbstractExecutorService则,直接new FutureTask;若aes不为null,则调用AbstractExecutorService的newTaskFor。实际也是通过new FutureTask实现。
private RunnableFuture<V> newTaskFor(Callable<V> task)
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
private RunnableFuture<V> newTaskFor(Runnable task, V result)
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
AbstractExecutorService的newTaskFor
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
return new FutureTask<T>(runnable, value);
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
return new FutureTask<T>(callable);
使用案例
场景一:
假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。 你可以这样写:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorCompletionServiceDemo
public static void main(String[] args)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is beginning. ");
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(completionService.submit(() ->
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
));
futures.add(completionService.submit(() ->
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
));
futures.add(completionService.submit(() ->
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
));
Integer r = 0;
try
for (int i = 0; i < 3; i++)
r = completionService.take().get();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + r);
catch (InterruptedException | ExecutionException e)
e.printStackTrace();
finally
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is ending.");
threadPoolExecutor.shutdown();
执行结果
[14:03:59:059--main] is beginning.
[14:04:00:000--pool-1-thread-3] sleep is over.
[14:04:00:000--main] 3
[14:04:01:001--pool-1-thread-2] sleep is over.
[14:04:01:001--main] 2
[14:04:02:002--pool-1-thread-1] sleep is over.
[14:04:02:002--main] 1
[14:04:02:002--main] is ending.
场景二:
假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorCompletionServiceDemo
public static void main(String[] args)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy());
CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);
List<Future<Integer>> futures = new ArrayList<>(3);
futures.add(completionService.submit(() ->
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
));
futures.add(completionService.submit(() ->
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
));
futures.add(completionService.submit(() ->
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
));
Integer r = 0;
try
for (int i = 0; i < 3; i++)
r = completionService.take().get();
// 如果结果获取到了,就退出for
if (r != null)
break;
catch (InterruptedException | ExecutionException e)
e.printStackTrace();
finally
for (Future<Integer> f : futures)
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] " + f.isDone());
f.cancel(true);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(JUC系列Executor框架之概览