java中的ExecutorCompletionService原理解析

Posted 乔不思

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java中的ExecutorCompletionService原理解析相关的知识,希望对你有一定的参考价值。

ExecutorCompletionService 是将 Executor和BlockQueue结合的jdk类,其实现的主要目的是:提交任务线程,每一个线程任务直线完成后,将返回值放在阻塞队列中,然后可以通过阻塞队列的take()方法返回 对应线程的执行结果!!


案例:

public class TestExecutorCompleteService 

	public static void main(String[] args) throws InterruptedException, ExecutionException 
	
		int num = 9;
		ExecutorCompletionService<String> executorCompletionService = new ExecutorCompletionService<>(MyThreadPool.getExecutor());
		
		for(int i=0;i<=num;i++)
			Thread.sleep(100l);
			executorCompletionService.submit(new Task(i));
		
		for(int i=0;i<=num;i++)
		System.out.println(executorCompletionService.take().get());
		MyThreadPool.getExecutor().shutdown();
	
	
	

class MyThreadPool
	
	 private static class exe
		private static ExecutorService executor = Executors.newCachedThreadPool();
	
	 
	private MyThreadPool()
	 
	public static  ExecutorService getExecutor()
		return exe.executor;
	


class Task implements Callable<String>

	private volatile int i;
	Task(int i)
		this.i = i;
	
	@Override
	public String call() throws Exception 
		Thread.sleep(1000l);
		return Thread.currentThread().getName() + "任务 :" +i;
	
	
执行结果:



源码分析:

类的定义,类头上已经清晰的解释了类的作用

* void solve(Executor e,
 *            Collection<Callable<Result>> solvers)
 *     throws InterruptedException 
 *     CompletionService<Result> ecs
 *         = new ExecutorCompletionService<Result>(e);
 *     int n = solvers.size();
 *     List<Future<Result>> futures
 *         = new ArrayList<Future<Result>>(n);
 *     Result result = null;
 *     try 
 *         for (Callable<Result> s : solvers)
 *             futures.add(ecs.submit(s));
 *         for (int i = 0; i < n; ++i) 
 *             try 
 *                 Result r = ecs.take().get();
 *                 if (r != null) 
 *                     result = r;
 *                     break;
 *                 
 *              catch (ExecutionException ignore) 
 *         
 *     
 *     finally 
 *         for (Future<Result> f : futures)
 *             f.cancel(true);
 *     
 *
 *     if (result != null)
 *         use(result);
 * </pre>
 */
public class ExecutorCompletionService<V> implements CompletionService<V> 
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> 
        QueueingFuture(RunnableFuture<V> task) 
            super(task, null);
            this.task = task;
        
        protected void done()  completionQueue.add(task); 
        private final Future<V> task;
    

构造方法:构造executor和blockqueue

 /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * @link LinkedBlockingQueue as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is @code null
     */
    public ExecutorCompletionService(Executor executor) 
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and the supplied queue as its
     * completion queue.
     *
     * @param executor the executor to use
     * @param completionQueue the queue to use as the completion queue
     *        normally one dedicated for use by this service. This
     *        queue is treated as unbounded -- failed attempted
     *        @code Queue.add operations for completed tasks cause
     *        them not to be retrievable.
     * @throws NullPointerException if executor or completionQueue are @code null
     */
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) 
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    

执行submit方法:这里可以看出 构造了一个QueueingFuture对象,感觉这个是整个机制的核心,看QueueingFuture的原码不难发现,他继承的是FurtrueTask对象,重写了done()方法(将excutor执行的future存放在queue中),FutureTask 继承了Futrue和Runnable接口。可以理解FutureTask就是AES的单线程版本,但是不具有提交线程的能力~~~

 public Future<V> submit(Callable<V> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    

然后我们看看这个done方法是在哪里被调用执行的:

FutureTask接收的都是Callable对象,如果是Runnable对象也将其转换成Callable。Callable的执行底层还是执行走的Runnable

Executors类的一个内部类

  /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> 
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) 
            this.task = task;
            this.result = result;
        
        public T call() 
            task.run();
            return result;
        
    


FutureTask的run方法:

 public void run() 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try 
            Callable<V> c = callable;
            if (c != null && state == NEW) 
                V result;
                boolean ran;
                try 
                    result = c.call();
                    ran = true;
                 catch (Throwable ex) 
                    result = null;
                    ran = false;
                    setException(ex);
                
                if (ran)
                    set(result);
            
         finally 
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        
    


其中set(result)方法和setException方法调用了done方法,所以在线程执行完成后会间接调用 done方法,然后将线程执行返回的future对象放在blockqueue中





个人见解:

ExecutorCompletionService 这个类的设计还是有问题的,可能不是Doug Lea写的缘故,没有达到应有的抽象,感觉封装的很邋遢。


感觉对CompletionService的定义很模糊,既包含ExecuterService的submit方法又包含BlockQueue的对应放啊。感觉还能在外围对他进行抽象,让其继承ExecuterService和BlockQueue,作者也有可能是觉得这样实现这个接口代价太大的缘故。。但是太大也可以让抽象实现一部分方法,然后继承抽象哈哈~~


再说说ExecuterCompletionService吧,既包含了ExxecutorService有包含了AES,还包含了BlockQueue。。。更是觉得得抽象了。。。





能力有限,只能成为吐槽者,不能成为真正的实现者。。。






以上是关于java中的ExecutorCompletionService原理解析的主要内容,如果未能解决你的问题,请参考以下文章

mySQL在java中的应用

Java中的ArrayList 重要方法补充

Java中的Math函数

使用java 8中的forEach(..)而不是java 5中的forEach循环的任何优势[重复]

Java中的数据类型

java - 为啥在java中的poll方法之后PriorityQueue中的值会发生变化? [复制]