理解Java8里面CompletableFuture异步编程

Posted 我是攻城师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了理解Java8里面CompletableFuture异步编程相关的知识,希望对你有一定的参考价值。

Java8主要的语言增强的能力有:

(1)lambda表达式

(2)stream式操作

(3)CompletableFuture

其中第三个特性,就是今天我们想要聊的话题,正是因为CompletableFuture的出现,才使得使用Java进行异步编程提供了可能。

什么是CompletableFuture?

CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过 回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future , CompletionStage 接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

Future vs CompletableFuture

Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future的主要缺点如下:

(1)不支持手动完成

这个意思指的是,我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。

(2)不支持进一步的非阻塞调用

这个指的是我们通过Future的get方法会一直阻塞到任务完成,但是我还想在获取任务之后,执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能。

(3)不支持链式调用

这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。

(4)不支持多个Future合并

比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。

(5)不支持异常处理

Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

简单的使用CompletableFuture

1,先看一个最简单的例子

在主线程里面创建一个CompletableFuture,然后主线程调用get方法会阻塞,最后我们在一个子线程中 使其终止。

 
   
   
 
  1.        CompletableFuture<String> completableFuture=new CompletableFuture<String>();

  2.        Runnable runnable=new Runnable() {

  3.            @Override

  4.            public void run() {

  5.                try {

  6.                    TimeUnit.SECONDS.sleep(3);

  7.                    System.out.println(getThreadName()+"执行.....");

  8.                    completableFuture.complete("success");//在子线程中完成主线程completableFuture的完成

  9.                } catch (InterruptedException e) {

  10.                    e.printStackTrace();

  11.                }

  12.            }

  13.        };

  14.        Thread t1=new Thread(runnable);

  15.        t1.start();//启动子线程

  16.        String result=completableFuture.get();//主线程阻塞,等待完成

  17.        System.out.println(getThreadName()+"1x:  "+result);

输出结果:

 
   
   
 
  1. Thread-0线程=> 执行.....

  2. main线程=> 1x:  success

2,运行一个简单的没有返回值的异步任务

 
   
   
 
  1.        CompletableFuture<Void> future=CompletableFuture.runAsync(new Runnable() {

  2.            @Override

  3.            public void run() {

  4.                try {

  5.                    System.out.println(getThreadName()+"正在执行一个没有返回值的异步任务。");

  6.                    TimeUnit.SECONDS.sleep(2);

  7.                } catch (InterruptedException e) {

  8.                    e.printStackTrace();

  9.                }

  10.            }

  11.        });

  12.        future.get();

  13.        System.out.println(getThreadName()+" 结束。");

输出如下:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> 正在执行一个没有返回值的异步任务。

  2. main线程=>  结束。

从上面我们可以看到CompletableFuture默认运行使用的是ForkJoin的的线程池。当然,你也可以用lambda表达式使得代码更精简。

3,运行一个有返回值的异步任务

 
   
   
 
  1.        CompletableFuture<String> future=CompletableFuture.supplyAsync(new Supplier<String>() {

  2.            @Override

  3.            public String get() {

  4.                try {

  5.                    System.out.println(getThreadName()+"正在执行一个有返回值的异步任务。");

  6.                    TimeUnit.SECONDS.sleep(2);

  7.                } catch (InterruptedException e) {

  8.                    e.printStackTrace();

  9.                }

  10.                return "OK";

  11.            }

  12.        });

  13.        String result=future.get();

  14.        System.out.println(getThreadName()+"  结果:"+result);

输出结果:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> 正在执行一个有返回值的异步任务。

  2. main线程=>   结果:OK

当然,上面默认的都是ForkJoinPool我们也可以换成Executor相关的Pool,其api都有支持如下:

 
   
   
 
  1. static CompletableFuture<Void>  runAsync(Runnable runnable)

  2. static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)

  3. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

  4. static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

高级的使用CompletableFuture

前面提到的几种使用方法是使用异步编程最简单的步骤,CompletableFuture.get()的方法会阻塞直到任务完成,这其实还是同步的概念,这对于一个异步系统是不够的,因为真正的异步是需要支持回调函数,这样以来,我们就可以直接在某个任务干完之后,接着执行回调里面的函数,从而做到真正的异步概念。

在CompletableFuture里面,我们通过thenApply(), thenAccept(),thenRun()方法,来运行一个回调函数。

(1)thenApply()

这个方法,其实用过函数式编程的人非常容易理解,类似于scala和spark的map算子,通过这个方法可以进行多次链式转化并返回最终的加工结果。

看下面一个例子:

 
   
   
 
  1.    public static void asyncCallback() throws ExecutionException, InterruptedException {

  2.        CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() {

  3.            @Override

  4.            public String get() {

  5.                System.out.println(getThreadName()+"supplyAsync");

  6.                return "123";

  7.            }

  8.        });

  9.        CompletableFuture<Integer> result1 = task.thenApply(number->{

  10.            System.out.println(getThreadName()+"thenApply1");

  11.            return Integer.parseInt(number);

  12.        });

  13.        CompletableFuture<Integer> result2 = result1.thenApply(number->{

  14.            System.out.println(getThreadName()+"thenApply2");

  15.            return number*2;

  16.        });

  17.        System.out.println(getThreadName()+" => "+result2.get());

  18.    }

输出结果:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> supplyAsync

  2. main线程=> thenApply1

  3. main线程=> thenApply2

  4. main线程=>  => 246

(2)thenAccept()

这个方法,可以接受Futrue的一个返回值,但是本身不在返回任何值,适合用于多个callback函数的最后一步操作使用。

例子如下:

 
   
   
 
  1.   public static void asyncCallback2() throws ExecutionException, InterruptedException {

  2.        CompletableFuture<String> task=CompletableFuture.supplyAsync(new Supplier<String>() {

  3.            @Override

  4.            public String get() {

  5.                System.out.println(getThreadName()+"supplyAsync");

  6.                return "123";

  7.            }

  8.        });

  9.        CompletableFuture<Integer> chain1 = task.thenApply(number->{

  10.            System.out.println(getThreadName()+"thenApply1");

  11.            return Integer.parseInt(number);

  12.        });

  13.        CompletableFuture<Integer> chain2 = chain1.thenApply(number->{

  14.            System.out.println(getThreadName()+"thenApply2");

  15.            return number*2;

  16.        });

  17.       CompletableFuture<Void> result=chain2.thenAccept(product->{

  18.           System.out.println(getThreadName()+"thenAccept="+product);

  19.       });

  20.        result.get();

  21.        System.out.println(getThreadName()+"end");

  22.    }

结果如下:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> supplyAsync

  2. main线程=> thenApply1

  3. main线程=> thenApply2

  4. main线程=> thenAccept=246

  5. main线程=> end

(3) thenRun()

这个方法与上一个方法类似,一般也用于回调函数最后的执行,但这个方法不接受回调函数的返回值,纯粹就代表执行任务的最后一个步骤:

 
   
   
 
  1.    public  static void asyncCallback3() throws ExecutionException, InterruptedException {

  2.        CompletableFuture.supplyAsync(()->{

  3.            System.out.println(getThreadName()+"supplyAsync: 一阶段任务");

  4.            return null;

  5.        }).thenRun(()->{

  6.            System.out.println(getThreadName()+"thenRun: 收尾任务");

  7.        }).get();

  8.    }

结果:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> supplyAsync: 一阶段任务

  2. main线程=> thenRun: 收尾任务

这里注意,截止到目前,前面的例子代码只会涉及两个线程,一个是主线程一个是ForkJoinPool池的线程,但其实上面的每一步都是支持异步运行的,其api如下:

 
   
   
 
  1. // thenApply() variants

  2. <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

  3. <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

  4. <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

我们看下改造后的一个例子:

 
   
   
 
  1.    public  static void asyncCallback4() throws ExecutionException, InterruptedException {

  2.        CompletableFuture<String> ref1=  CompletableFuture.supplyAsync(()->{

  3.            try {

  4.                System.out.println(getThreadName()+"supplyAsync开始执行任务1.... ");

  5. //                TimeUnit.SECONDS.sleep(1);

  6.            } catch (Exception e) {

  7.                e.printStackTrace();

  8.            }

  9.            System.out.println(getThreadName()+"supplyAsync: 任务1");

  10.            return null;

  11.        });

  12.        CompletableFuture<String> ref2= CompletableFuture.supplyAsync(()->{

  13.            try {

  14.            } catch (Exception e) {

  15.                e.printStackTrace();

  16.            }

  17.            System.out.println(getThreadName()+"thenApplyAsync: 任务2");

  18.            return null;

  19.        });

  20.        CompletableFuture<String> ref3=ref2.thenApplyAsync(value->{

  21.            System.out.println(getThreadName()+"thenApplyAsync: 任务2的子任务");

  22.            return  null;

  23.        });

  24.        Thread.sleep(4000);

  25.        System.out.println(getThreadName()+ref3.get());

  26.    }

输出结果如下:

 
   
   
 
  1. ForkJoinPool.commonPool-worker-1线程=> supplyAsync开始执行任务1....

  2. ForkJoinPool.commonPool-worker-1线程=> supplyAsync: 任务1

  3. ForkJoinPool.commonPool-worker-1线程=> supplyAsync: 任务2

  4. ForkJoinPool.commonPool-worker-2线程=> thenApplyAsync: 任务2的子任务

  5. main线程=> null

我们可以看到,ForkJoin池的线程1,执行了前面的三个任务,但是第二个任务的子任务,因为我们了使用也异步提交所以它用的线程是ForkJoin池的线程2,最终由于main线程处执行了get是最后结束的。

还有一点需要注意:

ForkJoinPool所有的工作线程都是守护模式的,也就是说如果主线程退出,那么整个处理任务都会结束,而不管你当前的任务是否执行完。如果需要主线程等待结束,可采用ExecutorsThreadPool,如下:

 
   
   
 
  1. ExecutorService pool = Executors.newFixedThreadPool(5);

  2. final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {

  3.                ... }, pool);

(4)thenCompose合并两个有依赖关系的CompletableFutures的执行结果

CompletableFutures在执行两个依赖的任务合并时,会返回一个嵌套的结果列表,为了避免这种情况我们可以使用thenCompose来返回,直接获取最顶层的结果数据即可:

 
   
   
 
  1.    public static void asyncCompose() throws ExecutionException, InterruptedException {

  2.        CompletableFuture<String>  future1=CompletableFuture.supplyAsync(new Supplier<String>() {

  3.            @Override

  4.            public String get() {

  5.                return "1";

  6.            }

  7.        });

  8.       CompletableFuture<String>nestedResult = future1.thenCompose(value->

  9.               CompletableFuture.supplyAsync(()->{

  10.                return value+"2";

  11.       }));

  12.        System.out.println(nestedResult.get());

  13.    }

(5)thenCombine合并两个没有依赖关系的CompletableFutures任务

 
   
   
 
  1.    CompletableFuture<Double>  d1= CompletableFuture.supplyAsync(new Supplier<Double>() {

  2.            @Override

  3.            public Double get() {

  4.                return 1d;

  5.            }

  6.        });

  7.        CompletableFuture<Double>  d2= CompletableFuture.supplyAsync(new Supplier<Double>() {

  8.            @Override

  9.            public Double get() {

  10.                return 2d;

  11.            }

  12.        });

  13.      CompletableFuture<Double> result=  d1.thenCombine(d2,(number1,number2)->{

  14.            return  number1+number2;

  15.        });

  16.        System.out.println(result.get());

(6)合并多个任务的结果allOf与anyOf

上面说的是两个任务的合并,那么多个任务需要使用allOf或者anyOf方法。

allOf适用于,你有一系列独立的future任务,你想等其所有的任务执行完后做一些事情。举个例子,比如我想下载100个网页,传统的串行,性能肯定不行,这里我们采用异步模式,同时对100个网页进行下载,当所有的任务下载完成之后,我们想判断每个网页是否包含某个关键词。

下面我们通过随机数来模拟上面的这个场景如下:

 
   
   
 
  1.    public static void mutilTaskTest() throws ExecutionException, InterruptedException {

  2.         //添加n个任务

  3.        CompletableFuture<Double> array[]=new CompletableFuture[3];

  4.        for ( int i = 0; i < 3; i++) {

  5.            array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() {

  6.                @Override

  7.                public Double get() {

  8.                    return Math.random();

  9.                }

  10.            });

  11.        }

  12.       //获取结果的方式一

  13. //       CompletableFuture.allOf(array).get();

  14. //        for(CompletableFuture<Double> cf:array){

  15. //            if(cf.get()>0.6){

  16. //                System.out.println(cf.get());

  17. //            }

  18. //        }

  19.        //获取结果的方式二,过滤大于指定数字,在收集输出

  20.       List<Double> rs= Stream.of(array).map(CompletableFuture::join).filter(number->number>0.6).collect(Collectors.toList());

  21.       System.out.println(rs);

  22.    }

结果如下:

 
   
   
 
  1. [0.8228784717152199]

注意其中的join方法和get方法类似,仅仅在于在Future不能正常完成的时候抛出一个unchecked的exception,这可以确保它用在Stream的map方法中,直接使用get是没法在map里面运行的。

anyOf方法,也比较简单,意思就是只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束。

 
   
   
 
  1.        CompletableFuture<String> f1=CompletableFuture.supplyAsync(new Supplier<String>() {

  2.            @Override

  3.            public String get() {

  4.                try {

  5.                    TimeUnit.SECONDS.sleep(4);

  6.                } catch (InterruptedException e) {

  7.                    e.printStackTrace();

  8.                }

  9.                return "wait 4 seconds";

  10.            }

  11.        });

  12.        CompletableFuture<String> f2=CompletableFuture.supplyAsync(new Supplier<String>() {

  13.            @Override

  14.            public String get() {

  15.                try {

  16.                    TimeUnit.SECONDS.sleep(2);

  17.                } catch (InterruptedException e) {

  18.                    e.printStackTrace();

  19.                }

  20.                return "wait 2 seconds";

  21.            }

  22.        });

  23.        CompletableFuture<String> f3=CompletableFuture.supplyAsync(new Supplier<String>() {

  24.            @Override

  25.            public String get() {

  26.                try {

  27.                    TimeUnit.SECONDS.sleep(4);

  28.                } catch (InterruptedException e) {

  29.                    e.printStackTrace();

  30.                }

  31.                return "wait 10 seconds";

  32.            }

  33.        });

  34.       CompletableFuture<Object> result= CompletableFuture.anyOf(f1,f2,f3);

  35.        System.out.println(result.get());

输出结果:

 
   
   
 
  1. wait 2 seconds

注意由于Anyof返回的是其中任意一个Future所以这里没有明确的返回类型,统一使用Object接受,留给使用端处理。

(7)exceptionally异常处理

异常处理是异步计算的一个重要环节,下面看看如何在CompletableFuture中使用:

 
   
   
 
  1.        int age=-1;

  2.       CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>() {

  3.           @Override

  4.           public String get() {

  5.               if(age<0){

  6.                   throw new IllegalArgumentException("性别必须大于0");

  7.               }

  8.               if(age<18){

  9.                   return "未成年人";

  10.               }

  11.               return "成年人";

  12.           }

  13.       }).exceptionally(ex->{

  14.           System.out.println(ex.getMessage());

  15.           return "发生 异常"+ex.getMessage();

  16.       });

  17.        System.out.println(task.get());

结果如下:

 
   
   
 
  1. java.lang.IllegalArgumentException: 性别必须大于0

  2. 发生 异常java.lang.IllegalArgumentException: 性别必须大于0

此外还有另外一种异常捕捉方法handle,无论发生异常都会执行,示例如下:

 
   
   
 
  1.        int age=10;

  2.        CompletableFuture<String> task= CompletableFuture.supplyAsync(new Supplier<String>() {

  3.            @Override

  4.            public String get() {

  5.                if(age<0){

  6.                    throw new IllegalArgumentException("性别必须大于0");

  7.                }

  8.                if(age<18){

  9.                    return "未成年人";

  10.                }

  11.                return "成年人";

  12.            }

  13.        }).handle((res,ex)->{

  14.            System.out.println("执行handle");

  15.            if(ex!=null){

  16.                System.out.println("发生异常");

  17.                return "发生 异常"+ex.getMessage();

  18.            }

  19.            return res;

  20.        });

  21.        System.out.println(task.get());

输出结果:

 
   
   
 
  1. 执行handle

  2. 发生异常

  3. 发生 异常java.lang.IllegalArgumentException: 性别必须大于0

注意上面的方法如果正常执行,也会执行handle方法。

JDK9 CompletableFuture 类增强的主要内容

(1)支持对异步方法的超时调用

 
   
   
 
  1. orTimeout()

  2. completeOnTimeout()

(2)支持延迟调用

 
   
   
 
  1. Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)

  2. Executor delayedExecutor(long delay, TimeUnit unit)

详细内容,可以参考Oracle官网文档,这里不再过多介绍。

总结:

本文主要介绍了CompletableFuture的定义,概念及在Java中使用的例子,通过CompletableFuture我们可以实现异步编程的能力,从而使得我们开发的任务可以拥有更强大的能力。


以上是关于理解Java8里面CompletableFuture异步编程的主要内容,如果未能解决你的问题,请参考以下文章

java代码之美(14)---Java8 函数式接口

java代码之美---Java8 函数式接口

JAVA8给我带了什么——流(入门)

Java8-11的新特性和理解的误区

理解HashMap源码

java8里面lambda的stream()用法讲解