JUC系列Executor框架之CompletionService

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之CompletionService相关的知识,希望对你有一定的参考价值。

【JUC系列】Executor框架之CompletionService

文章目录

需要优先阅读【JUC系列】Executor框架之FutureTask

CompletionService

这是一个接口,提供一种服务,它将新的异步任务的生产与已完成任务的结果的消费分离。生产者提交任务以供执行。消费者接受已完成的任务并按照他们完成的顺序处理他们的结果。

例如,CompletionService 可用于管理异步 I/O,其中执行读取的任务在程序或系统的一个部分中提交,然后在读取完成时在程序的不同部分执行操作,可能在与他们要求的顺序不同。通常,CompletionService 依赖一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。 ExecutorCompletionService 类提供了这种方法的实现。

内存一致性效果:在将任务提交到 CompletionService 之前线程中的操作发生在该任务所采取的操作之前,这反过来又发生在从相应的 take() 成功返回之后的操作。

接口提供了以下方法

方法名描述
Future submit(Callable task)提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 完成后,可以采取或轮询此任务。
Future submit(Runnable task, V result)提交 Runnable 任务以执行并返回代表该任务的 Future。 完成后,可以take或poll此任务。
Future take() throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则等待。
Future poll()检索并删除表示下一个已完成任务的 Future,如果不存在,则返回 null。
Future poll(long timeout, TimeUnit unit) throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则在必要时等待指定的等待时间。

ExecutorCompletionService

ExecutorCompletionService实现了CompletionService。

一个 CompletionService,它使用提供的 Executor 来执行任务。 此类安排提交的任务在完成后放置在使用 take 可访问的队列中。 该类足够轻量级,适合在处理任务组时临时使用。

成员变量

// 执行任务的线程池
private final Executor executor;
// ???
private final AbstractExecutorService aes;
// 任务完成会存放在该阻塞队列中
private final BlockingQueue<Future<V>> completionQueue;

内部类QueueingFuture

继承了FutureTask

private class QueueingFuture extends FutureTask<Void> 
    QueueingFuture(RunnableFuture<V> task) 
        super(task, null);
        this.task = task;
    
    // 执行成功,将任务添加到completionQueue队列中。主要在FutureTask执行finishCompletion()时,调用done。
    protected void done()  completionQueue.add(task); 
    private final Future<V> task;

构造函数

参数executor:传入的线程池,用来执行任务;默认的队列是LinkedBlockingQueue。

public ExecutorCompletionService(Executor executor) 
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();

参数executor:传入的线程池,用来执行任务;

参数completionQueue:用来记录执行结果的阻塞列表

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) 
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;

任务执行

public Future<V> submit(Callable<V> task) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    // 将任务转换成QueueingFuture
    executor.execute(new QueueingFuture(f));
    return f;

public Future<V> submit(Runnable task, V result) 
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    // 将任务转换成QueueingFuture 
    executor.execute(new QueueingFuture(f));
    return f;

ExecutorCompletionService的newTaskFor,将Callable或Runnable任务转成FutureTask。

若线程池不为AbstractExecutorService则,直接new FutureTask;若aes不为null,则调用AbstractExecutorService的newTaskFor。实际也是通过new FutureTask实现。

private RunnableFuture<V> newTaskFor(Callable<V> task) 
    if (aes == null)
        return new FutureTask<V>(task);
    else
        return aes.newTaskFor(task);


private RunnableFuture<V> newTaskFor(Runnable task, V result) 
    if (aes == null)
        return new FutureTask<V>(task, result);
    else
        return aes.newTaskFor(task, result);

AbstractExecutorService的newTaskFor

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) 
    return new FutureTask<T>(runnable, value);


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) 
    return new FutureTask<T>(callable);

使用案例

场景一:

假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。 你可以这样写:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class ExecutorCompletionServiceDemo 
    public static void main(String[] args) 
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.AbortPolicy());

        CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is beginning. ");

        List<Future<Integer>> futures = new ArrayList<>(3);
        futures.add(completionService.submit(() -> 
            Thread.sleep(3000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 1;
        ));
        futures.add(completionService.submit(() -> 
            Thread.sleep(2000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 2;
        ));
        futures.add(completionService.submit(() -> 
            Thread.sleep(1000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 3;
        ));

        Integer r = 0;
        try 
            for (int i = 0; i < 3; i++) 
                r = completionService.take().get();
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "]  " + r);
            
         catch (InterruptedException | ExecutionException e) 
            e.printStackTrace();
         finally 
        
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "]  is ending.");
        threadPoolExecutor.shutdown();
    

执行结果

[14:03:59:059--main] is beginning. 
[14:04:00:000--pool-1-thread-3] sleep is over. 
[14:04:00:000--main]  3
[14:04:01:001--pool-1-thread-2] sleep is over. 
[14:04:01:001--main]  2
[14:04:02:002--pool-1-thread-1] sleep is over. 
[14:04:02:002--main]  1
[14:04:02:002--main]  is ending.

场景二:

假设您想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class ExecutorCompletionServiceDemo 
    public static void main(String[] args) 
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                new ThreadPoolExecutor.AbortPolicy());

        CompletionService<Integer> completionService = new ExecutorCompletionService<>(threadPoolExecutor);

        List<Future<Integer>> futures = new ArrayList<>(3);
        futures.add(completionService.submit(() -> 
            Thread.sleep(3000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 1;
        ));
        futures.add(completionService.submit(() -> 
            Thread.sleep(2000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");

            return 2;
        ));
        futures.add(completionService.submit(() -> 
            Thread.sleep(1000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 3;
        ));

        Integer r = 0;
        try 
            for (int i = 0; i < 3; i++) 
                r = completionService.take().get();
                // 如果结果获取到了,就退出for
                if (r != null) 
                    break;
                
            
         catch (InterruptedException | ExecutionException e) 
            e.printStackTrace();
         finally 
            for (Future<Integer> f : futures) 
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "]  " + f.isDone());
                f.cancel(true);
            
        
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(JUC系列Executor框架之概览

JUC系列Executor框架之CompletionFuture

JUC系列Executor框架之线程池执行器

Java 并发之 Executor 框架

Java 并发之 Executor 框架

Java并发专题之十juc-locks之线程池框架概述