Java 源码解析 ---- ExecutorCompletionService

Posted wenniuwuren

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 源码解析 ---- ExecutorCompletionService相关的知识,希望对你有一定的参考价值。

一、具体例子

先使用三种方法比较并发结果异同:
方法一:提交后等待 future 结果返回

方法二:自己维护一个Collection保存submit方法返回的Future,然后在主线程中遍历这个Collection并调用Future的get()方法取到线程的返回值。

方法三:使用CompletionService类,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法获取线程的返回值。

import java.util.Random;
import java.util.concurrent.*;

/**
 * Created by mianse.zyb on 2019/4/30
 */
public class ConcurrentTest 

    static final int TOTAL_TASK = 6;

    private static final ExecutorService executorService = new ThreadPoolExecutor(16, 16,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>());

    BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<Future<String>>();

    public static void main(String[] args) throws Exception
        ConcurrentTest concurrentTest = new ConcurrentTest();

        long currentTime = System.currentTimeMillis();
        concurrentTest.testConcurrentNotWithQueue();
        System.out.println("testConcurrentNotWithQueue cost=" + (System.currentTimeMillis() - currentTime));
        System.out.println("--------------------------------------------");

        long currentTime1 = System.currentTimeMillis();
        concurrentTest.testConcurrentWithQueue();
        System.out.println("testConcurrentWithQueue cost=" + (System.currentTimeMillis() - currentTime1));
        System.out.println("--------------------------------------------");

        long currentTime2 = System.currentTimeMillis();
        concurrentTest.testConcurrentWithCompletionServic();
        System.out.println("testConcurrentWithCompletionServic cost=" + (System.currentTimeMillis() - currentTime2));

        executorService.shutdown();
    

    public void testConcurrentNotWithQueue() throws Exception 
        // 异步提交任务
        for (int i = 0; i < TOTAL_TASK; i++) 
            Future<String> future = executorService.submit(new MyThread("thread-" + i));
            System.out.println("testConcurrentNotWithQueue:" + future.get());
        
    

    public void testConcurrentWithQueue() throws Exception 
        // 异步提交任务
        for (int i = 0; i < TOTAL_TASK; i++) 
            Future<String> future = executorService.submit(new MyThread("thread-" + i));
            blockingQueue.add(future); // 如果不加入队列,则需要一个一个等待获取结果
        

        // 获取任务结果
        for (int i = 0; i < TOTAL_TASK; i++) 
            System.out.println("testConcurrentWithQueue:" + blockingQueue.take().get());
        


    

    public void testConcurrentWithCompletionServic() throws Exception 
        CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

        // 异步提交任务
        for (int i = 0; i < TOTAL_TASK; i++) 
            completionService.submit(new MyThread("thread-" + i));
        

        // 获取任务结果
        for (int i = 0; i < TOTAL_TASK; i++) 
            Future<String> future = completionService.take();
            System.out.println("testConcurrentWithCompletionServic:" + future.get());
        
    

    class MyThread implements Callable<String> 
        private String name;

        public MyThread(String name) 
            this.name = name;
        

        @Override
        public String call() 
            int sleepTime =  new Random().nextInt(1000);
            try 
                Thread.sleep(sleepTime);
             catch (InterruptedException e) 
                e.printStackTrace();
            

            // 返回给调用者的值
            String str = name + " sleep time:" + sleepTime;
            System.out.println(name + " execute finished");

            return str;
        
    

初步结论:

方法一,阻塞等待每一个任务结果返回,是个串行结果

方法二,自己创建一个queue存储Future并循环调用其返回结果的时候,它是按任务提交的顺序返回。因为future.get()是等待计算结果的阻塞方法,所以只能根据线程提交顺序返回结果

方法三,使用CompletionService来维护处理线程的返回结果,主线程总是能够拿到最先完成的任务的返回值,与任务提交顺序无关。

 

二、源码解析

可以看到 ExecutorCompletionService 中:重写了 FutureTask 的 done 方法,在任务执行完成后把任务丢进队列,这样就实现了哪个任务先完成先输出结果

/**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> 
        QueueingFuture(RunnableFuture<V> task) 
            super(task, null);
            this.task = task;
        
        protected void done()  completionQueue.add(task); 
        private final Future<V> task;
    

 

以上是关于Java 源码解析 ---- ExecutorCompletionService的主要内容,如果未能解决你的问题,请参考以下文章

Java并发源码解析

java集合 源码解析 学习手册

Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码

Java集合类源码解析:AbstractMap

JAVA常用集合源码解析系列-ArrayList源码解析(基于JDK8)

Java Executor源码解析—Executors线程池工厂以及四大内置线程池