Java线程之CompletionService

Posted hello---word

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程之CompletionService相关的知识,希望对你有一定的参考价值。

背景

当我们需要同时处理一批任务时,并需要在任务完成时,可以获得任务的结果时,我们该怎么办呢。

  • 第一种方案是:保存每一个任务关联的Future,然后主线程遍历每一个Future进行get,由于get会阻塞,我们只能设置timeot为0,但是这样会有比较大的性能消耗。
  • 第二种方案:使用阻塞队列,每一个任务线程作为生产者在处理完成后,将结果put到阻塞队列。主线程作为消费者直接从阻塞队列take就好了,如果没有内容就阻塞。
  • 第三种方案:直接使用CompletionService,具体的实现类是ExecutorCompletionService。

ExecutorCompletionService

代码示例

public class CompletionServiceDemo implements Callable<Integer> 
    Random r = new Random();
    public static void main(String[] args) throws ExecutionException, InterruptedException 
        new CompletionServiceDemo().process();
    

    public void process() throws InterruptedException, ExecutionException 
        Executor executor = Executors.newFixedThreadPool(2);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
        int taskCount = 5;
        for (int i = 0; i < taskCount; i++) 
            completionService.submit(this);
        
        int sum = 0;
        for (int i = 0; i < taskCount; i++) 
            Integer res = completionService.take().get();
            sum += res;
        
        System.out.println(sum);

    
    @Override
    public Integer call() throws Exception 
        int i = r.nextInt(500);
        Thread.sleep(i);
        return i;
    

 

实现原理

ExecutorCompletionService封装了Executor和BlockingQueue,。首先提交Callable任务到executor,然后将任务封装成QueueingFuture,它是FutureTask子类,然后重写done方法,即在线程执行完将Future对象置入BlockingQueue中。done方法是个回调方法,当FutureTask执行完后会设置result对象,然后就会回调done方法。take和poll方法委托给了 BlockingQueue,它会在结果不可用时阻塞。这样哪个任务先执行完,就能先获得哪个任务对应的结果了

部分源码

public class ExecutorCompletionService<V> implements CompletionService<V> 
    private final Executor executor;
    private final BlockingQueue<Future<V>> completionQueue;
    ...
    private class QueueingFuture extends FutureTask<Void> 
        QueueingFuture(RunnableFuture<V> task) 
            super(task, null);
            this.task = task;
        
        protected void done()  completionQueue.add(task); 
        private final Future<V> task;
    
    ...
    public Future<V> submit(Callable<V> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    
    ....
    public Future<V> take() throws InterruptedException 
        return completionQueue.take();
    
    ...

 

以上是关于Java线程之CompletionService的主要内容,如果未能解决你的问题,请参考以下文章

Java--多线程之join,yield,sleep;线程优先级;定时器;守护线程

JAVA开发知识之Java的线程

Java高并发专题之8线程组

Java--多线程之生产者消费者模式;线程池ExecutorService

Java多线程系列--“JUC线程池”02之 线程池原理

Java--多线程之synchronized和lock;死锁