Java 源码解析 ---- ExecutorCompletionService
Posted wenniuwuren
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 源码解析 ---- ExecutorCompletionService相关的知识,希望对你有一定的参考价值。
一、具体例子
先使用三种方法比较并发结果异同:
方法一:提交后等待 future 结果返回
方法二:自己维护一个Collection保存submit方法返回的Future,然后在主线程中遍历这个Collection并调用Future的get()方法取到线程的返回值。
方法三:使用CompletionService类,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法获取线程的返回值。
import java.util.Random;
import java.util.concurrent.*;
/**
* Created by mianse.zyb on 2019/4/30
*/
public class ConcurrentTest
static final int TOTAL_TASK = 6;
private static final ExecutorService executorService = new ThreadPoolExecutor(16, 16,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<Future<String>>();
public static void main(String[] args) throws Exception
ConcurrentTest concurrentTest = new ConcurrentTest();
long currentTime = System.currentTimeMillis();
concurrentTest.testConcurrentNotWithQueue();
System.out.println("testConcurrentNotWithQueue cost=" + (System.currentTimeMillis() - currentTime));
System.out.println("--------------------------------------------");
long currentTime1 = System.currentTimeMillis();
concurrentTest.testConcurrentWithQueue();
System.out.println("testConcurrentWithQueue cost=" + (System.currentTimeMillis() - currentTime1));
System.out.println("--------------------------------------------");
long currentTime2 = System.currentTimeMillis();
concurrentTest.testConcurrentWithCompletionServic();
System.out.println("testConcurrentWithCompletionServic cost=" + (System.currentTimeMillis() - currentTime2));
executorService.shutdown();
public void testConcurrentNotWithQueue() throws Exception
// 异步提交任务
for (int i = 0; i < TOTAL_TASK; i++)
Future<String> future = executorService.submit(new MyThread("thread-" + i));
System.out.println("testConcurrentNotWithQueue:" + future.get());
public void testConcurrentWithQueue() throws Exception
// 异步提交任务
for (int i = 0; i < TOTAL_TASK; i++)
Future<String> future = executorService.submit(new MyThread("thread-" + i));
blockingQueue.add(future); // 如果不加入队列,则需要一个一个等待获取结果
// 获取任务结果
for (int i = 0; i < TOTAL_TASK; i++)
System.out.println("testConcurrentWithQueue:" + blockingQueue.take().get());
public void testConcurrentWithCompletionServic() throws Exception
CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
// 异步提交任务
for (int i = 0; i < TOTAL_TASK; i++)
completionService.submit(new MyThread("thread-" + i));
// 获取任务结果
for (int i = 0; i < TOTAL_TASK; i++)
Future<String> future = completionService.take();
System.out.println("testConcurrentWithCompletionServic:" + future.get());
class MyThread implements Callable<String>
private String name;
public MyThread(String name)
this.name = name;
@Override
public String call()
int sleepTime = new Random().nextInt(1000);
try
Thread.sleep(sleepTime);
catch (InterruptedException e)
e.printStackTrace();
// 返回给调用者的值
String str = name + " sleep time:" + sleepTime;
System.out.println(name + " execute finished");
return str;
初步结论:
方法一,阻塞等待每一个任务结果返回,是个串行结果
方法二,自己创建一个queue存储Future并循环调用其返回结果的时候,它是按任务提交的顺序返回。因为future.get()是等待计算结果的阻塞方法,所以只能根据线程提交顺序返回结果
方法三,使用CompletionService来维护处理线程的返回结果,主线程总是能够拿到最先完成的任务的返回值,与任务提交顺序无关。
二、源码解析
可以看到 ExecutorCompletionService 中:重写了 FutureTask 的 done 方法,在任务执行完成后把任务丢进队列,这样就实现了哪个任务先完成先输出结果
/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void>
QueueingFuture(RunnableFuture<V> task)
super(task, null);
this.task = task;
protected void done() completionQueue.add(task);
private final Future<V> task;
以上是关于Java 源码解析 ---- ExecutorCompletionService的主要内容,如果未能解决你的问题,请参考以下文章
Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码