CompletableFuture异步任务的简单使用

Posted 默辨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture异步任务的简单使用相关的知识,希望对你有一定的参考价值。





写在前面:如果你是奔着标题来的,只想看CompletableFuture的使用,可以直接拉到第三节,查看具体的测试案例。如果你是想由浅入深的学习,建议按照顺序一点一点往下看


一、FutureTask

1、Runnable接口

提到Callable接口,就一定要提到实现线程的三种方式。第一种是继承Thread类,一种是实现Runnable接口,最后一种就是我们这里说到的实现Callable接口。前两者是比较常见的实现多线程的方式,但是它们都有一个致命的问题,那就是没法获取返回值。



使用继承Thread类的方式实现多线程,其本质和实现Runnable接口相同,通过观察Thread的构造方法不难发现,它传入的参数target就是实现了Runnable接口

public Thread(ThreadGroup group, Runnable target, String name, long stackSize) 
    init(group, target, name, stackSize);

然后调用Runnable接口的run方法,最终达到和实现Runnable接口相同的目的。

@Override
public void run() 
    if (target != null) 
        target.run();
    




2、Callable接口

于是在Java 1.5就提供了Callable接口来实现这一场景,而Future和Future Task就可以和Callable接口配合起来使用。

Runnable

@FunctionalInterface
public interface Runnable 
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();



Callable

@FunctionalInterface
public interface Callable<V> 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;



观察对应的接口,我们可以很直观的得到两个点:

  • 不能返回一个返回值
  • 不能抛出Exception

想要使用Callable接口实现多线程,就需要和Future类配合,通过Future可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是Runnable做不到的,Callable的功能要比Runnable强大。




3、Future接口

Future就是对于具体的Runnable或者Callable任务(因为Future接口的实现类FutureTask既可以接受Runnable接口的参数进行实例化,也可以接收Callable接口的参数进行实例化)的执行结果进行取消、查询是否完成、获取结果。

必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。



4、代码测试案例

业务类

public class Task implements Callable<String> 
	private String taskName;

	public Task() 
	

	public Task(String taskName) 
		this.taskName = taskName;
	

	@Override
	public String call() throws Exception 
		Random ra = new Random();
		// 业务执行时间
		int time = ra.nextInt(10);
		System.out.println(this.taskName + "需要执行:" + time + " s");
		TimeUnit.SECONDS.sleep(time);
		return time + "s的" + this.taskName + "执行返回";
	



测试案例一:使用线程池执行五个多线程任务,不获取返回结果

代码实现思路:

1、使用Callable实现多线程,所以一定要有一个实现了Callable的任务类

2、由于Future只是一个接口,所以在实现多线程的时候需要借助其实现类FutureTask

3、将实现Callable接口的任务封装成FutureTask类的对象

4、交由线程池执行(线程池能够接收实现Runnable接口和Callable接口的对象)

5、记得关闭线程池资源(公共线程池除外)

public class FutureTaskTest01 
	public static void main(String[] args) throws ExecutionException, InterruptedException 
		
        long startTime = System.currentTimeMillis();
		FutureTask<String> task1 = new FutureTask<String>(new Task("任务1"));
		FutureTask<String> task2 = new FutureTask<String>(new Task("任务2"));
		FutureTask<String> task3 = new FutureTask<String>(new Task("任务3"));
		FutureTask<String> task4 = new FutureTask<String>(new Task("任务4"));
		FutureTask<String> task5 = new FutureTask<String>(new Task("任务5"));

		ExecutorService executor = Executors.newFixedThreadPool(5);
		executor.submit(task1);
		executor.submit(task2);
		executor.submit(task3);
		executor.submit(task4);
		executor.submit(task5);

		executor.shutdownNow();
		long endTime = System.currentTimeMillis();

		System.out.println("任务执行总时间:" + (endTime - startTime)+" ms");
	


返回结果:

不难发现执行任务时是非阻塞式的



测试案例二:获取返回结果(获取实现Callable接口的返回结果时,一定要区别与实现Runnable接口的方式的submit和execute方法,此处只需要调用对应的FutureTask任务的get方法即可,前文有解释)

public class FutureTaskTest01 
	public static void main(String[] args) throws ExecutionException, InterruptedException 

		long startTime = System.currentTimeMillis();


		FutureTask<String> task1 = new FutureTask<String>(new Task("任务1"));
		FutureTask<String> task2 = new FutureTask<String>(new Task("任务2"));
		FutureTask<String> task3 = new FutureTask<String>(new Task("任务3"));
		FutureTask<String> task4 = new FutureTask<String>(new Task("任务4"));
		FutureTask<String> task5 = new FutureTask<String>(new Task("任务5"));

		ExecutorService executor = Executors.newFixedThreadPool(5);
		executor.submit(task1);
		executor.submit(task2);
		executor.submit(task3);
		executor.submit(task4);
		executor.execute(task5);

		System.out.println(task1.get());
		System.out.println(task2.get());
		System.out.println(task3.get());
		System.out.println(task4.get());
		System.out.println(task5.get());
		// 关闭线程池
		executor.shutdownNow();
		long endTime = System.currentTimeMillis();

		System.out.println("任务执行总时间:" + (endTime - startTime)+" ms");
	

测试结果:

不难发现,调用get方法获取返回结果是阻塞式的





5、总结

Future 注意事项

  • 当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
  • Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来



Future的局限性

从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成,并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如汇总数据,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了5个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法,这就会导致外层无法感知内部的处理情况;






二、CompletionService

Callable + Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时,需要阻塞等待前面的task执行完后面task才能取得结果(即调用get方法获取返回结果是阻塞式的)。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

1、代码测试案例

代码的实现方式上与FutureTask有略微的区别

代码实现思路:

1、创建一个线程池

2、将线程池封装为一个CompletionService对象

3、借助CompletionService对象来提交执行对应的多线程任务

4、调用CompletionService对象的take().get()方法获取返回结果

public class CompletionServiceTest01 
	public static void main(String[] args) throws InterruptedException, ExecutionException 

		long startTime = System.currentTimeMillis();
		ExecutorService executor = Executors.newFixedThreadPool(5);
		CompletionService<String> comple = new ExecutorCompletionService<>(executor);
		int num = 5;

		for (int i = 0; i < num; i++) 
			comple.submit(new Task("任务" + i));
		
		for (int i = 0; i < num; i++) 
			System.out.println(comple.take().get());
		

		long endTime = System.currentTimeMillis();
		System.out.println("总时间:" + (endTime - startTime));
	


测试结果:




2、源码实现原理

不难发现使用CompletionService来执行多线程任务时,调用get方法获取返回结果的时候不再是阻塞式的获取,而是任务执行完毕就直接返回。

前文有说过,这个非阻塞式的得到执行结果是借助队列实现的。当我们看到take方法,第一反应也应该马上想到队列的api。



源码逻辑:

1、当我们调用take方法获取返回结果时,会调用到ExecutorCompletionService类的take方法

2、take方法会去completionQueue队列中获取,而该队列就是一个存放Future的BlockingQueue

3、该队列默认是一个LinkedBlockingQueue

4、当然我们也可以自己指定一个对应的队列(比如ArrayBlockingQueue)

5、调用submit方式时,会初始化一个QueueingFuture对象

6、然后将对应的task任务,交给FutureTask类,完成对应任务的执行

7、当FutureTask任务执行完成后,会调用finishCompletion方法,该方法会调用done方法

8、done方法在QueueingFuture类中进行了重写,即完成将task任务添加到completionQueue队列中的目的

9、即调用take方法的时候,获取到的就是处理后的task(FutureTask),然后调用对应的get方法获取结果




应用场景总结

  • 当需要批量提交异步任务的时候建议使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
  • CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
  • 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。






三、CompletableFuture

在学习这部分知识之前,一定要对lambda表达式有一定的基础,不然代码无论是看起来还是写起来都会晦涩难懂。也从侧面说出来,该工具方法是基于JDK8的新语法

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。



1、API描述

1)描述依赖关系:两个线程依次执行,放在一个CompletableFuture任务中

  1. thenApply():把前面异步任务的结果,交给后面的Function。有返回值。
  2. thenAccept():获取前一个线程的结果,消费。无返回值。
  3. thenRun():忽略前一个线程的结果,执行额外的逻辑。无返回值。
  4. whenComplete():获取前一个线程的结果或异常,进行消费。五印象上一个线程的返回值
  5. exceptionally():前面线程异常时,执行。一般跟whenComplete配合使用,即我们长江的捕获异常,有返回值。
  6. handle():相当于whenComplete()+exceptionally(),根据是否产生异常,内部进行if else分支处理。有返回值。
  7. thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。


2)描述and聚合关系:两个线程都要执行完毕,需要多个CompletableFuture任务

  1. thenCombine:任务合并,有返回值;
  2. thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值;
  3. runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。


3)描述or聚合关系:两个线程任意一个执行完毕,需要多个CompletableFuture任务

  1. applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值;
  2. acceptEither:两个任务谁执行的快,就消耗那一个结果,无返回值;
  3. runAfterEither:任意一个任务执行完成,进行下一步操作(Runnable)。



4)描述并行执行:等待执行完成

  1. anyOf():多个线程任一一个执行完毕即返回,有返回值。
  2. allOf():多个线程全部执行完毕才能会犯,无返回值。



5)异步执行对应的线程

  1. runAsync:开启一个异步任务,无返回值
  2. supplyAsync:开启一个异步粪污,有返回值



6)获取对应的结果

  1. join:获取返回结果,抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出
  2. get:获取返回结果,抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)




2、API补充细节说明

创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable);

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

这四个方法主要区别为:

  • runAsync 方法以Runnable函数式接口类型为参数,没有返回结果。supplyAsync 方法Supplier函数式接口类型为参数,返回结果类型为U,Supplier 接口的 get() 方法是有返回值的(会阻塞);
  • 如果我们没有指定对应的线程池,即实例化参数仅传入了task任务,那么Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。
  • 默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ;ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。




结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
  • Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。
  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
  • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常




结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

如:thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

又如:thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;

public <U> CompletableFuture<U> thenComposeAsync(Function<? super以上是关于CompletableFuture异步任务的简单使用的主要内容,如果未能解决你的问题,请参考以下文章

CompletableFuture异步编排(两任务组合——其中一个任务执行)

CompletableFuture异步编排(两任务组合——其中一个任务执行)

CompletableFuture异步编排(两任务组合——其中一个任务执行)

CompletableFuture进阶

CompletableFuture异步编排(多任务组合)

CompletableFuture异步编排(多任务组合)